A simple Claude AI Bot that uses the Claude API
at main 16 kB view raw
1import os 2import time 3import re 4from datetime import datetime 5from typing import Optional, Set 6from dotenv import load_dotenv 7import anthropic 8from atproto import Client 9 10# Load environment variables 11load_dotenv() 12 13# Configuration 14BLUESKY_HANDLE = os.getenv("BLUESKY_HANDLE") 15BLUESKY_PASSWORD = os.getenv("BLUESKY_PASSWORD") 16ANTHROPIC_API_KEY = os.getenv("ANTHROPIC_API_KEY") 17BOT_HANDLE = os.getenv("BOT_HANDLE", "claude.altq.net") 18 19if not BLUESKY_HANDLE or not BLUESKY_PASSWORD: 20 raise ValueError("BLUESKY_HANDLE and BLUESKY_PASSWORD must be set in environment variables") 21 22if not ANTHROPIC_API_KEY: 23 raise ValueError("ANTHROPIC_API_KEY must be set in environment variables") 24 25# Initialize clients 26anthropic_client = anthropic.Anthropic(api_key=ANTHROPIC_API_KEY) 27# Use custom PDS at altq.net 28bluesky_client = Client(base_url="https://altq.net") 29 30# Track processed posts to avoid duplicates 31processed_posts: Set[str] = set() 32 33 34def extract_post_text(record) -> str: 35 """Extract text from a post record.""" 36 if hasattr(record, 'text'): 37 return record.text 38 elif isinstance(record, dict) and 'text' in record: 39 return record['text'] 40 return "" 41 42 43def mentions_bot(text: str, bot_handle: str) -> bool: 44 """Check if the post mentions the bot.""" 45 normalized_text = text.lower() 46 normalized_handle = bot_handle.lower().replace("@", "") 47 48 # Check for @mention (handle various formats) 49 return f"@{normalized_handle}" in normalized_text or f"@{normalized_handle.replace('.', '.')}" in normalized_text 50 51 52def extract_question(text: str, bot_handle: str) -> str: 53 """Extract the actual question/request from the post.""" 54 normalized_handle = bot_handle.lower().replace("@", "") 55 56 # Remove the mention from the text 57 mention_pattern = re.compile(f"@{re.escape(normalized_handle)}\\s*", re.IGNORECASE) 58 question = mention_pattern.sub("", text).strip() 59 60 # Remove any other mentions at the start 61 question = re.sub(r"^@[\w.]+\s+", "", question) 62 63 return question if question else "Hello! How can I help you?" 64 65 66def get_claude_response(question: str) -> str: 67 """Get Claude's response to a question.""" 68 try: 69 prompt = f"""You will be acting as a simple bot that posts on Bluesky (a social media platform similar to Twitter). You will be given a topic or prompt to respond to. 70 71<topic> 72{question} 73</topic> 74 75Here are the guidelines for your response: 76 77- Keep your response short and concise (similar to a tweet - aim for 1-2 sentences maximum) 78- Write in a casual, friendly tone appropriate for social media 79- Stay on topic and provide a relevant response to the given prompt 80- Avoid controversial topics, offensive language, or anything inappropriate for a public social media post 81- Do not include hashtags, mentions (@), or special formatting unless specifically relevant to the topic 82- Write as if you're a helpful, conversational bot engaging with the Bluesky community 83 84Your response should be brief, engaging, and suitable for posting directly on Bluesky. Write only the post content - do not include any explanations, meta-commentary, or additional text beyond what would appear in the actual social media post.""" 85 86 message = anthropic_client.messages.create( 87 model="claude-sonnet-4-5-20250929", 88 max_tokens=20000, 89 temperature=1, 90 messages=[ 91 { 92 "role": "user", 93 "content": [ 94 { 95 "type": "text", 96 "text": prompt 97 } 98 ] 99 } 100 ] 101 ) 102 103 # Extract text from response 104 if message.content and len(message.content) > 0: 105 content_block = message.content[0] 106 if hasattr(content_block, 'text'): 107 return content_block.text 108 elif isinstance(content_block, dict) and 'text' in content_block: 109 return content_block['text'] 110 111 return "I'm sorry, I couldn't generate a text response." 112 except Exception as error: 113 print(f"Error calling Claude API: {error}") 114 import traceback 115 traceback.print_exc() 116 raise 117 118 119def split_text(text: str, max_length: int = 280) -> list: 120 """Split text into chunks that fit within the character limit.""" 121 if len(text) <= max_length: 122 return [text] 123 124 chunks = [] 125 sentences = text.split('. ') 126 current_chunk = "" 127 128 for sentence in sentences: 129 test_chunk = current_chunk + (". " if current_chunk else "") + sentence 130 if len(test_chunk) <= max_length: 131 current_chunk = test_chunk 132 else: 133 if current_chunk: 134 chunks.append(current_chunk) 135 current_chunk = sentence 136 137 if current_chunk: 138 chunks.append(current_chunk) 139 140 # If still too long, split by words 141 if any(len(chunk) > max_length for chunk in chunks): 142 chunks = [] 143 words = text.split() 144 current_chunk = "" 145 146 for word in words: 147 test_chunk = current_chunk + (" " if current_chunk else "") + word 148 if len(test_chunk) <= max_length: 149 current_chunk = test_chunk 150 else: 151 if current_chunk: 152 chunks.append(current_chunk) 153 current_chunk = word 154 155 if current_chunk: 156 chunks.append(current_chunk) 157 158 return chunks 159 160 161def reply_to_post(parent_uri: str, parent_cid: str, text: str, root_uri: Optional[str] = None, root_cid: Optional[str] = None): 162 """Post a reply to Bluesky.""" 163 try: 164 # Use root if provided, otherwise use parent as root 165 root_uri_final = root_uri or parent_uri 166 root_cid_final = root_cid or parent_cid 167 168 # Split text if needed 169 chunks = split_text(text, max_length=280) 170 171 # Import the ReplyRef model 172 from atproto_client.models.app.bsky.feed.post import ReplyRef 173 from atproto_client.models.com.atproto.repo.strong_ref import Main as StrongRef 174 175 last_uri = parent_uri 176 last_cid = parent_cid 177 178 for i, chunk in enumerate(chunks): 179 if i == 0: 180 # First reply 181 reply_ref = ReplyRef( 182 root=StrongRef(uri=root_uri_final, cid=root_cid_final), 183 parent=StrongRef(uri=parent_uri, cid=parent_cid) 184 ) 185 else: 186 # Subsequent replies in thread 187 reply_ref = ReplyRef( 188 root=StrongRef(uri=root_uri_final, cid=root_cid_final), 189 parent=StrongRef(uri=last_uri, cid=last_cid) 190 ) 191 192 response = bluesky_client.send_post( 193 text=chunk, 194 reply_to=reply_ref 195 ) 196 197 # Update last URI/CID for next reply 198 if response: 199 # Extract URI and CID from response 200 if hasattr(response, 'uri'): 201 last_uri = response.uri 202 elif hasattr(response, 'value') and hasattr(response.value, 'uri'): 203 last_uri = response.value.uri 204 elif hasattr(response, 'data') and hasattr(response.data, 'uri'): 205 last_uri = response.data.uri 206 207 if hasattr(response, 'cid'): 208 last_cid = response.cid 209 elif hasattr(response, 'value') and hasattr(response.value, 'cid'): 210 last_cid = response.value.cid 211 elif hasattr(response, 'data') and hasattr(response.data, 'cid'): 212 last_cid = response.data.cid 213 214 print(f"Replied to post: {parent_uri}") 215 except Exception as error: 216 print(f"Error posting reply: {error}") 217 import traceback 218 traceback.print_exc() 219 raise 220 221 222def process_notification(notification): 223 """Process a notification.""" 224 try: 225 # Only process mentions 226 if not hasattr(notification, 'reason') or notification.reason != "mention": 227 return 228 229 uri = getattr(notification, 'uri', None) 230 cid = getattr(notification, 'cid', None) 231 232 if not uri or not cid: 233 return 234 235 # Skip if we've already processed this post 236 if uri in processed_posts: 237 return 238 239 # Skip if it's our own post 240 author = getattr(notification, 'author', None) 241 if author and hasattr(author, 'handle') and author.handle == BLUESKY_HANDLE: 242 return 243 244 # Get the post record 245 record = getattr(notification, 'record', None) 246 247 if not record: 248 # Try to fetch the post using the thread API 249 try: 250 thread_response = bluesky_client.get_post_thread(uri) 251 if thread_response and hasattr(thread_response, 'thread'): 252 thread_data = thread_response.thread 253 if hasattr(thread_data, 'post') and hasattr(thread_data.post, 'record'): 254 record = thread_data.post.record 255 except Exception as error: 256 print(f"Could not fetch post {uri}: {error}") 257 return 258 259 if not record: 260 print("No record found in notification") 261 return 262 263 # Extract text from record 264 text = extract_post_text(record) 265 266 # Check if the post mentions the bot 267 if not mentions_bot(text, BOT_HANDLE): 268 return 269 270 author_handle = author.handle if author and hasattr(author, 'handle') else "unknown" 271 print(f"Processing mention from @{author_handle}: {text[:100]}...") 272 273 # Mark as processed immediately to avoid duplicate processing 274 processed_posts.add(uri) 275 276 # Extract the question 277 question = extract_question(text, BOT_HANDLE) 278 print(f"Question: {question[:100]}...") 279 280 # Get Claude's response 281 response = get_claude_response(question) 282 print(f"Claude response: {response[:100]}...") 283 284 # Determine root post (if this is a reply, use the root; otherwise use this post) 285 root_uri = uri 286 root_cid = cid 287 if hasattr(record, 'reply') and record.reply: 288 reply_data = record.reply 289 if hasattr(reply_data, 'root'): 290 root = reply_data.root 291 root_uri = getattr(root, 'uri', uri) 292 root_cid = getattr(root, 'cid', cid) 293 294 # Reply to the post 295 reply_to_post(uri, cid, response, root_uri, root_cid) 296 297 print(f"Successfully replied to @{author_handle}") 298 except Exception as error: 299 print(f"Error processing notification: {error}") 300 import traceback 301 traceback.print_exc() 302 # Don't re-raise, just log - we don't want one error to stop processing 303 304 305def main(): 306 """Main function.""" 307 try: 308 # Login to Bluesky 309 print(f"Logging in as @{BLUESKY_HANDLE}...") 310 bluesky_client.login(login=BLUESKY_HANDLE, password=BLUESKY_PASSWORD) 311 print("Logged in successfully!") 312 313 # Get the bot's profile 314 try: 315 profile = bluesky_client.get_profile(actor=BLUESKY_HANDLE) 316 handle = getattr(profile, 'handle', BLUESKY_HANDLE) 317 did = getattr(profile, 'did', "unknown") 318 print(f"Bot profile: @{handle} ({did})") 319 except Exception as e: 320 print(f"Could not get profile: {e}") 321 322 print(f"Listening for mentions of @{BOT_HANDLE}") 323 324 # Track the latest notification timestamp to only process new ones 325 last_seen_timestamp: Optional[datetime] = None 326 327 # Poll for notifications 328 def poll_notifications(): 329 nonlocal last_seen_timestamp 330 try: 331 # Get notifications using the correct API 332 from atproto_client.models.app.bsky.notification.list_notifications import Params 333 params = Params(limit=50) 334 response = bluesky_client.app.bsky.notification.list_notifications(params=params) 335 336 # Extract notifications list 337 if hasattr(response, 'notifications'): 338 notifications = response.notifications 339 elif hasattr(response, 'data') and hasattr(response.data, 'notifications'): 340 notifications = response.data.notifications 341 elif isinstance(response, dict) and 'notifications' in response: 342 notifications = response['notifications'] 343 elif isinstance(response, list): 344 notifications = response 345 else: 346 notifications = [] 347 348 # Filter to only process new notifications 349 new_notifications = [] 350 if last_seen_timestamp: 351 for notification in notifications: 352 indexed_at = getattr(notification, 'indexed_at', None) or getattr(notification, 'indexedAt', None) 353 354 if indexed_at: 355 # Parse timestamp 356 if isinstance(indexed_at, str): 357 try: 358 notif_date = datetime.fromisoformat(indexed_at.replace('Z', '+00:00')) 359 except: 360 # Try alternative format 361 notif_date = datetime.fromisoformat(indexed_at.replace('Z', '')) 362 else: 363 notif_date = indexed_at 364 365 if notif_date > last_seen_timestamp: 366 new_notifications.append(notification) 367 else: 368 # On first run, only process the 10 most recent 369 new_notifications = notifications[:10] 370 371 # Process each new notification 372 for notification in new_notifications: 373 process_notification(notification) 374 375 # Update last seen timestamp 376 if notifications: 377 first_notif = notifications[0] 378 indexed_at = getattr(first_notif, 'indexed_at', None) or getattr(first_notif, 'indexedAt', None) 379 380 if indexed_at: 381 if isinstance(indexed_at, str): 382 try: 383 latest_timestamp = datetime.fromisoformat(indexed_at.replace('Z', '+00:00')) 384 except: 385 latest_timestamp = datetime.fromisoformat(indexed_at.replace('Z', '')) 386 else: 387 latest_timestamp = indexed_at 388 389 if not last_seen_timestamp or latest_timestamp > last_seen_timestamp: 390 last_seen_timestamp = latest_timestamp 391 except Exception as error: 392 print(f"Error polling notifications: {error}") 393 import traceback 394 traceback.print_exc() 395 # Try to re-authenticate on error 396 if "expired" in str(error).lower() or "unauthorized" in str(error).lower() or "auth" in str(error).lower(): 397 print("Session expired, re-authenticating...") 398 bluesky_client.login(login=BLUESKY_HANDLE, password=BLUESKY_PASSWORD) 399 400 401 # Initial poll (wait a bit to avoid processing old notifications) 402 print("Waiting 2 seconds before initial poll...") 403 time.sleep(2) 404 poll_notifications() 405 406 # Continue polling 407 while True: 408 time.sleep(10) 409 poll_notifications() 410 411 except KeyboardInterrupt: 412 print("\nShutting down gracefully...") 413 except Exception as error: 414 print(f"Fatal error: {error}") 415 import traceback 416 traceback.print_exc() 417 raise 418 419 420if __name__ == "__main__": 421 main()