A simple Claude AI Bot that uses the Claude API
1import os
2import time
3import re
4from datetime import datetime
5from typing import Optional, Set
6from dotenv import load_dotenv
7import anthropic
8from atproto import Client
9
10# Load environment variables
11load_dotenv()
12
13# Configuration
14BLUESKY_HANDLE = os.getenv("BLUESKY_HANDLE")
15BLUESKY_PASSWORD = os.getenv("BLUESKY_PASSWORD")
16ANTHROPIC_API_KEY = os.getenv("ANTHROPIC_API_KEY")
17BOT_HANDLE = os.getenv("BOT_HANDLE", "claude.altq.net")
18
19if not BLUESKY_HANDLE or not BLUESKY_PASSWORD:
20 raise ValueError("BLUESKY_HANDLE and BLUESKY_PASSWORD must be set in environment variables")
21
22if not ANTHROPIC_API_KEY:
23 raise ValueError("ANTHROPIC_API_KEY must be set in environment variables")
24
25# Initialize clients
26anthropic_client = anthropic.Anthropic(api_key=ANTHROPIC_API_KEY)
27# Use custom PDS at altq.net
28bluesky_client = Client(base_url="https://altq.net")
29
30# Track processed posts to avoid duplicates
31processed_posts: Set[str] = set()
32
33
34def extract_post_text(record) -> str:
35 """Extract text from a post record."""
36 if hasattr(record, 'text'):
37 return record.text
38 elif isinstance(record, dict) and 'text' in record:
39 return record['text']
40 return ""
41
42
43def mentions_bot(text: str, bot_handle: str) -> bool:
44 """Check if the post mentions the bot."""
45 normalized_text = text.lower()
46 normalized_handle = bot_handle.lower().replace("@", "")
47
48 # Check for @mention (handle various formats)
49 return f"@{normalized_handle}" in normalized_text or f"@{normalized_handle.replace('.', '.')}" in normalized_text
50
51
52def extract_question(text: str, bot_handle: str) -> str:
53 """Extract the actual question/request from the post."""
54 normalized_handle = bot_handle.lower().replace("@", "")
55
56 # Remove the mention from the text
57 mention_pattern = re.compile(f"@{re.escape(normalized_handle)}\\s*", re.IGNORECASE)
58 question = mention_pattern.sub("", text).strip()
59
60 # Remove any other mentions at the start
61 question = re.sub(r"^@[\w.]+\s+", "", question)
62
63 return question if question else "Hello! How can I help you?"
64
65
66def get_claude_response(question: str) -> str:
67 """Get Claude's response to a question."""
68 try:
69 prompt = f"""You will be acting as a simple bot that posts on Bluesky (a social media platform similar to Twitter). You will be given a topic or prompt to respond to.
70
71<topic>
72{question}
73</topic>
74
75Here are the guidelines for your response:
76
77- Keep your response short and concise (similar to a tweet - aim for 1-2 sentences maximum)
78- Write in a casual, friendly tone appropriate for social media
79- Stay on topic and provide a relevant response to the given prompt
80- Avoid controversial topics, offensive language, or anything inappropriate for a public social media post
81- Do not include hashtags, mentions (@), or special formatting unless specifically relevant to the topic
82- Write as if you're a helpful, conversational bot engaging with the Bluesky community
83
84Your response should be brief, engaging, and suitable for posting directly on Bluesky. Write only the post content - do not include any explanations, meta-commentary, or additional text beyond what would appear in the actual social media post."""
85
86 message = anthropic_client.messages.create(
87 model="claude-sonnet-4-5-20250929",
88 max_tokens=20000,
89 temperature=1,
90 messages=[
91 {
92 "role": "user",
93 "content": [
94 {
95 "type": "text",
96 "text": prompt
97 }
98 ]
99 }
100 ]
101 )
102
103 # Extract text from response
104 if message.content and len(message.content) > 0:
105 content_block = message.content[0]
106 if hasattr(content_block, 'text'):
107 return content_block.text
108 elif isinstance(content_block, dict) and 'text' in content_block:
109 return content_block['text']
110
111 return "I'm sorry, I couldn't generate a text response."
112 except Exception as error:
113 print(f"Error calling Claude API: {error}")
114 import traceback
115 traceback.print_exc()
116 raise
117
118
119def split_text(text: str, max_length: int = 280) -> list:
120 """Split text into chunks that fit within the character limit."""
121 if len(text) <= max_length:
122 return [text]
123
124 chunks = []
125 sentences = text.split('. ')
126 current_chunk = ""
127
128 for sentence in sentences:
129 test_chunk = current_chunk + (". " if current_chunk else "") + sentence
130 if len(test_chunk) <= max_length:
131 current_chunk = test_chunk
132 else:
133 if current_chunk:
134 chunks.append(current_chunk)
135 current_chunk = sentence
136
137 if current_chunk:
138 chunks.append(current_chunk)
139
140 # If still too long, split by words
141 if any(len(chunk) > max_length for chunk in chunks):
142 chunks = []
143 words = text.split()
144 current_chunk = ""
145
146 for word in words:
147 test_chunk = current_chunk + (" " if current_chunk else "") + word
148 if len(test_chunk) <= max_length:
149 current_chunk = test_chunk
150 else:
151 if current_chunk:
152 chunks.append(current_chunk)
153 current_chunk = word
154
155 if current_chunk:
156 chunks.append(current_chunk)
157
158 return chunks
159
160
161def reply_to_post(parent_uri: str, parent_cid: str, text: str, root_uri: Optional[str] = None, root_cid: Optional[str] = None):
162 """Post a reply to Bluesky."""
163 try:
164 # Use root if provided, otherwise use parent as root
165 root_uri_final = root_uri or parent_uri
166 root_cid_final = root_cid or parent_cid
167
168 # Split text if needed
169 chunks = split_text(text, max_length=280)
170
171 # Import the ReplyRef model
172 from atproto_client.models.app.bsky.feed.post import ReplyRef
173 from atproto_client.models.com.atproto.repo.strong_ref import Main as StrongRef
174
175 last_uri = parent_uri
176 last_cid = parent_cid
177
178 for i, chunk in enumerate(chunks):
179 if i == 0:
180 # First reply
181 reply_ref = ReplyRef(
182 root=StrongRef(uri=root_uri_final, cid=root_cid_final),
183 parent=StrongRef(uri=parent_uri, cid=parent_cid)
184 )
185 else:
186 # Subsequent replies in thread
187 reply_ref = ReplyRef(
188 root=StrongRef(uri=root_uri_final, cid=root_cid_final),
189 parent=StrongRef(uri=last_uri, cid=last_cid)
190 )
191
192 response = bluesky_client.send_post(
193 text=chunk,
194 reply_to=reply_ref
195 )
196
197 # Update last URI/CID for next reply
198 if response:
199 # Extract URI and CID from response
200 if hasattr(response, 'uri'):
201 last_uri = response.uri
202 elif hasattr(response, 'value') and hasattr(response.value, 'uri'):
203 last_uri = response.value.uri
204 elif hasattr(response, 'data') and hasattr(response.data, 'uri'):
205 last_uri = response.data.uri
206
207 if hasattr(response, 'cid'):
208 last_cid = response.cid
209 elif hasattr(response, 'value') and hasattr(response.value, 'cid'):
210 last_cid = response.value.cid
211 elif hasattr(response, 'data') and hasattr(response.data, 'cid'):
212 last_cid = response.data.cid
213
214 print(f"Replied to post: {parent_uri}")
215 except Exception as error:
216 print(f"Error posting reply: {error}")
217 import traceback
218 traceback.print_exc()
219 raise
220
221
222def process_notification(notification):
223 """Process a notification."""
224 try:
225 # Only process mentions
226 if not hasattr(notification, 'reason') or notification.reason != "mention":
227 return
228
229 uri = getattr(notification, 'uri', None)
230 cid = getattr(notification, 'cid', None)
231
232 if not uri or not cid:
233 return
234
235 # Skip if we've already processed this post
236 if uri in processed_posts:
237 return
238
239 # Skip if it's our own post
240 author = getattr(notification, 'author', None)
241 if author and hasattr(author, 'handle') and author.handle == BLUESKY_HANDLE:
242 return
243
244 # Get the post record
245 record = getattr(notification, 'record', None)
246
247 if not record:
248 # Try to fetch the post using the thread API
249 try:
250 thread_response = bluesky_client.get_post_thread(uri)
251 if thread_response and hasattr(thread_response, 'thread'):
252 thread_data = thread_response.thread
253 if hasattr(thread_data, 'post') and hasattr(thread_data.post, 'record'):
254 record = thread_data.post.record
255 except Exception as error:
256 print(f"Could not fetch post {uri}: {error}")
257 return
258
259 if not record:
260 print("No record found in notification")
261 return
262
263 # Extract text from record
264 text = extract_post_text(record)
265
266 # Check if the post mentions the bot
267 if not mentions_bot(text, BOT_HANDLE):
268 return
269
270 author_handle = author.handle if author and hasattr(author, 'handle') else "unknown"
271 print(f"Processing mention from @{author_handle}: {text[:100]}...")
272
273 # Mark as processed immediately to avoid duplicate processing
274 processed_posts.add(uri)
275
276 # Extract the question
277 question = extract_question(text, BOT_HANDLE)
278 print(f"Question: {question[:100]}...")
279
280 # Get Claude's response
281 response = get_claude_response(question)
282 print(f"Claude response: {response[:100]}...")
283
284 # Determine root post (if this is a reply, use the root; otherwise use this post)
285 root_uri = uri
286 root_cid = cid
287 if hasattr(record, 'reply') and record.reply:
288 reply_data = record.reply
289 if hasattr(reply_data, 'root'):
290 root = reply_data.root
291 root_uri = getattr(root, 'uri', uri)
292 root_cid = getattr(root, 'cid', cid)
293
294 # Reply to the post
295 reply_to_post(uri, cid, response, root_uri, root_cid)
296
297 print(f"Successfully replied to @{author_handle}")
298 except Exception as error:
299 print(f"Error processing notification: {error}")
300 import traceback
301 traceback.print_exc()
302 # Don't re-raise, just log - we don't want one error to stop processing
303
304
305def main():
306 """Main function."""
307 try:
308 # Login to Bluesky
309 print(f"Logging in as @{BLUESKY_HANDLE}...")
310 bluesky_client.login(login=BLUESKY_HANDLE, password=BLUESKY_PASSWORD)
311 print("Logged in successfully!")
312
313 # Get the bot's profile
314 try:
315 profile = bluesky_client.get_profile(actor=BLUESKY_HANDLE)
316 handle = getattr(profile, 'handle', BLUESKY_HANDLE)
317 did = getattr(profile, 'did', "unknown")
318 print(f"Bot profile: @{handle} ({did})")
319 except Exception as e:
320 print(f"Could not get profile: {e}")
321
322 print(f"Listening for mentions of @{BOT_HANDLE}")
323
324 # Track the latest notification timestamp to only process new ones
325 last_seen_timestamp: Optional[datetime] = None
326
327 # Poll for notifications
328 def poll_notifications():
329 nonlocal last_seen_timestamp
330 try:
331 # Get notifications using the correct API
332 from atproto_client.models.app.bsky.notification.list_notifications import Params
333 params = Params(limit=50)
334 response = bluesky_client.app.bsky.notification.list_notifications(params=params)
335
336 # Extract notifications list
337 if hasattr(response, 'notifications'):
338 notifications = response.notifications
339 elif hasattr(response, 'data') and hasattr(response.data, 'notifications'):
340 notifications = response.data.notifications
341 elif isinstance(response, dict) and 'notifications' in response:
342 notifications = response['notifications']
343 elif isinstance(response, list):
344 notifications = response
345 else:
346 notifications = []
347
348 # Filter to only process new notifications
349 new_notifications = []
350 if last_seen_timestamp:
351 for notification in notifications:
352 indexed_at = getattr(notification, 'indexed_at', None) or getattr(notification, 'indexedAt', None)
353
354 if indexed_at:
355 # Parse timestamp
356 if isinstance(indexed_at, str):
357 try:
358 notif_date = datetime.fromisoformat(indexed_at.replace('Z', '+00:00'))
359 except:
360 # Try alternative format
361 notif_date = datetime.fromisoformat(indexed_at.replace('Z', ''))
362 else:
363 notif_date = indexed_at
364
365 if notif_date > last_seen_timestamp:
366 new_notifications.append(notification)
367 else:
368 # On first run, only process the 10 most recent
369 new_notifications = notifications[:10]
370
371 # Process each new notification
372 for notification in new_notifications:
373 process_notification(notification)
374
375 # Update last seen timestamp
376 if notifications:
377 first_notif = notifications[0]
378 indexed_at = getattr(first_notif, 'indexed_at', None) or getattr(first_notif, 'indexedAt', None)
379
380 if indexed_at:
381 if isinstance(indexed_at, str):
382 try:
383 latest_timestamp = datetime.fromisoformat(indexed_at.replace('Z', '+00:00'))
384 except:
385 latest_timestamp = datetime.fromisoformat(indexed_at.replace('Z', ''))
386 else:
387 latest_timestamp = indexed_at
388
389 if not last_seen_timestamp or latest_timestamp > last_seen_timestamp:
390 last_seen_timestamp = latest_timestamp
391 except Exception as error:
392 print(f"Error polling notifications: {error}")
393 import traceback
394 traceback.print_exc()
395 # Try to re-authenticate on error
396 if "expired" in str(error).lower() or "unauthorized" in str(error).lower() or "auth" in str(error).lower():
397 print("Session expired, re-authenticating...")
398 bluesky_client.login(login=BLUESKY_HANDLE, password=BLUESKY_PASSWORD)
399
400
401 # Initial poll (wait a bit to avoid processing old notifications)
402 print("Waiting 2 seconds before initial poll...")
403 time.sleep(2)
404 poll_notifications()
405
406 # Continue polling
407 while True:
408 time.sleep(10)
409 poll_notifications()
410
411 except KeyboardInterrupt:
412 print("\nShutting down gracefully...")
413 except Exception as error:
414 print(f"Fatal error: {error}")
415 import traceback
416 traceback.print_exc()
417 raise
418
419
420if __name__ == "__main__":
421 main()