import os import time import re from datetime import datetime from typing import Optional, Set from dotenv import load_dotenv import anthropic from atproto import Client # Load environment variables load_dotenv() # Configuration BLUESKY_HANDLE = os.getenv("BLUESKY_HANDLE") BLUESKY_PASSWORD = os.getenv("BLUESKY_PASSWORD") ANTHROPIC_API_KEY = os.getenv("ANTHROPIC_API_KEY") BOT_HANDLE = os.getenv("BOT_HANDLE", "claude.altq.net") if not BLUESKY_HANDLE or not BLUESKY_PASSWORD: raise ValueError("BLUESKY_HANDLE and BLUESKY_PASSWORD must be set in environment variables") if not ANTHROPIC_API_KEY: raise ValueError("ANTHROPIC_API_KEY must be set in environment variables") # Initialize clients anthropic_client = anthropic.Anthropic(api_key=ANTHROPIC_API_KEY) # Use custom PDS at altq.net bluesky_client = Client(base_url="https://altq.net") # Track processed posts to avoid duplicates processed_posts: Set[str] = set() def extract_post_text(record) -> str: """Extract text from a post record.""" if hasattr(record, 'text'): return record.text elif isinstance(record, dict) and 'text' in record: return record['text'] return "" def mentions_bot(text: str, bot_handle: str) -> bool: """Check if the post mentions the bot.""" normalized_text = text.lower() normalized_handle = bot_handle.lower().replace("@", "") # Check for @mention (handle various formats) return f"@{normalized_handle}" in normalized_text or f"@{normalized_handle.replace('.', '.')}" in normalized_text def extract_question(text: str, bot_handle: str) -> str: """Extract the actual question/request from the post.""" normalized_handle = bot_handle.lower().replace("@", "") # Remove the mention from the text mention_pattern = re.compile(f"@{re.escape(normalized_handle)}\\s*", re.IGNORECASE) question = mention_pattern.sub("", text).strip() # Remove any other mentions at the start question = re.sub(r"^@[\w.]+\s+", "", question) return question if question else "Hello! How can I help you?" def get_claude_response(question: str) -> str: """Get Claude's response to a question.""" try: prompt = f"""You will be acting as a simple bot that posts on Bluesky (a social media platform similar to Twitter). You will be given a topic or prompt to respond to. {question} Here are the guidelines for your response: - Keep your response short and concise (similar to a tweet - aim for 1-2 sentences maximum) - Write in a casual, friendly tone appropriate for social media - Stay on topic and provide a relevant response to the given prompt - Avoid controversial topics, offensive language, or anything inappropriate for a public social media post - Do not include hashtags, mentions (@), or special formatting unless specifically relevant to the topic - Write as if you're a helpful, conversational bot engaging with the Bluesky community Your response should be brief, engaging, and suitable for posting directly on Bluesky. Write only the post content - do not include any explanations, meta-commentary, or additional text beyond what would appear in the actual social media post.""" message = anthropic_client.messages.create( model="claude-sonnet-4-5-20250929", max_tokens=20000, temperature=1, messages=[ { "role": "user", "content": [ { "type": "text", "text": prompt } ] } ] ) # Extract text from response if message.content and len(message.content) > 0: content_block = message.content[0] if hasattr(content_block, 'text'): return content_block.text elif isinstance(content_block, dict) and 'text' in content_block: return content_block['text'] return "I'm sorry, I couldn't generate a text response." except Exception as error: print(f"Error calling Claude API: {error}") import traceback traceback.print_exc() raise def split_text(text: str, max_length: int = 280) -> list: """Split text into chunks that fit within the character limit.""" if len(text) <= max_length: return [text] chunks = [] sentences = text.split('. ') current_chunk = "" for sentence in sentences: test_chunk = current_chunk + (". " if current_chunk else "") + sentence if len(test_chunk) <= max_length: current_chunk = test_chunk else: if current_chunk: chunks.append(current_chunk) current_chunk = sentence if current_chunk: chunks.append(current_chunk) # If still too long, split by words if any(len(chunk) > max_length for chunk in chunks): chunks = [] words = text.split() current_chunk = "" for word in words: test_chunk = current_chunk + (" " if current_chunk else "") + word if len(test_chunk) <= max_length: current_chunk = test_chunk else: if current_chunk: chunks.append(current_chunk) current_chunk = word if current_chunk: chunks.append(current_chunk) return chunks def reply_to_post(parent_uri: str, parent_cid: str, text: str, root_uri: Optional[str] = None, root_cid: Optional[str] = None): """Post a reply to Bluesky.""" try: # Use root if provided, otherwise use parent as root root_uri_final = root_uri or parent_uri root_cid_final = root_cid or parent_cid # Split text if needed chunks = split_text(text, max_length=280) # Import the ReplyRef model from atproto_client.models.app.bsky.feed.post import ReplyRef from atproto_client.models.com.atproto.repo.strong_ref import Main as StrongRef last_uri = parent_uri last_cid = parent_cid for i, chunk in enumerate(chunks): if i == 0: # First reply reply_ref = ReplyRef( root=StrongRef(uri=root_uri_final, cid=root_cid_final), parent=StrongRef(uri=parent_uri, cid=parent_cid) ) else: # Subsequent replies in thread reply_ref = ReplyRef( root=StrongRef(uri=root_uri_final, cid=root_cid_final), parent=StrongRef(uri=last_uri, cid=last_cid) ) response = bluesky_client.send_post( text=chunk, reply_to=reply_ref ) # Update last URI/CID for next reply if response: # Extract URI and CID from response if hasattr(response, 'uri'): last_uri = response.uri elif hasattr(response, 'value') and hasattr(response.value, 'uri'): last_uri = response.value.uri elif hasattr(response, 'data') and hasattr(response.data, 'uri'): last_uri = response.data.uri if hasattr(response, 'cid'): last_cid = response.cid elif hasattr(response, 'value') and hasattr(response.value, 'cid'): last_cid = response.value.cid elif hasattr(response, 'data') and hasattr(response.data, 'cid'): last_cid = response.data.cid print(f"Replied to post: {parent_uri}") except Exception as error: print(f"Error posting reply: {error}") import traceback traceback.print_exc() raise def process_notification(notification): """Process a notification.""" try: # Only process mentions if not hasattr(notification, 'reason') or notification.reason != "mention": return uri = getattr(notification, 'uri', None) cid = getattr(notification, 'cid', None) if not uri or not cid: return # Skip if we've already processed this post if uri in processed_posts: return # Skip if it's our own post author = getattr(notification, 'author', None) if author and hasattr(author, 'handle') and author.handle == BLUESKY_HANDLE: return # Get the post record record = getattr(notification, 'record', None) if not record: # Try to fetch the post using the thread API try: thread_response = bluesky_client.get_post_thread(uri) if thread_response and hasattr(thread_response, 'thread'): thread_data = thread_response.thread if hasattr(thread_data, 'post') and hasattr(thread_data.post, 'record'): record = thread_data.post.record except Exception as error: print(f"Could not fetch post {uri}: {error}") return if not record: print("No record found in notification") return # Extract text from record text = extract_post_text(record) # Check if the post mentions the bot if not mentions_bot(text, BOT_HANDLE): return author_handle = author.handle if author and hasattr(author, 'handle') else "unknown" print(f"Processing mention from @{author_handle}: {text[:100]}...") # Mark as processed immediately to avoid duplicate processing processed_posts.add(uri) # Extract the question question = extract_question(text, BOT_HANDLE) print(f"Question: {question[:100]}...") # Get Claude's response response = get_claude_response(question) print(f"Claude response: {response[:100]}...") # Determine root post (if this is a reply, use the root; otherwise use this post) root_uri = uri root_cid = cid if hasattr(record, 'reply') and record.reply: reply_data = record.reply if hasattr(reply_data, 'root'): root = reply_data.root root_uri = getattr(root, 'uri', uri) root_cid = getattr(root, 'cid', cid) # Reply to the post reply_to_post(uri, cid, response, root_uri, root_cid) print(f"Successfully replied to @{author_handle}") except Exception as error: print(f"Error processing notification: {error}") import traceback traceback.print_exc() # Don't re-raise, just log - we don't want one error to stop processing def main(): """Main function.""" try: # Login to Bluesky print(f"Logging in as @{BLUESKY_HANDLE}...") bluesky_client.login(login=BLUESKY_HANDLE, password=BLUESKY_PASSWORD) print("Logged in successfully!") # Get the bot's profile try: profile = bluesky_client.get_profile(actor=BLUESKY_HANDLE) handle = getattr(profile, 'handle', BLUESKY_HANDLE) did = getattr(profile, 'did', "unknown") print(f"Bot profile: @{handle} ({did})") except Exception as e: print(f"Could not get profile: {e}") print(f"Listening for mentions of @{BOT_HANDLE}") # Track the latest notification timestamp to only process new ones last_seen_timestamp: Optional[datetime] = None # Poll for notifications def poll_notifications(): nonlocal last_seen_timestamp try: # Get notifications using the correct API from atproto_client.models.app.bsky.notification.list_notifications import Params params = Params(limit=50) response = bluesky_client.app.bsky.notification.list_notifications(params=params) # Extract notifications list if hasattr(response, 'notifications'): notifications = response.notifications elif hasattr(response, 'data') and hasattr(response.data, 'notifications'): notifications = response.data.notifications elif isinstance(response, dict) and 'notifications' in response: notifications = response['notifications'] elif isinstance(response, list): notifications = response else: notifications = [] # Filter to only process new notifications new_notifications = [] if last_seen_timestamp: for notification in notifications: indexed_at = getattr(notification, 'indexed_at', None) or getattr(notification, 'indexedAt', None) if indexed_at: # Parse timestamp if isinstance(indexed_at, str): try: notif_date = datetime.fromisoformat(indexed_at.replace('Z', '+00:00')) except: # Try alternative format notif_date = datetime.fromisoformat(indexed_at.replace('Z', '')) else: notif_date = indexed_at if notif_date > last_seen_timestamp: new_notifications.append(notification) else: # On first run, only process the 10 most recent new_notifications = notifications[:10] # Process each new notification for notification in new_notifications: process_notification(notification) # Update last seen timestamp if notifications: first_notif = notifications[0] indexed_at = getattr(first_notif, 'indexed_at', None) or getattr(first_notif, 'indexedAt', None) if indexed_at: if isinstance(indexed_at, str): try: latest_timestamp = datetime.fromisoformat(indexed_at.replace('Z', '+00:00')) except: latest_timestamp = datetime.fromisoformat(indexed_at.replace('Z', '')) else: latest_timestamp = indexed_at if not last_seen_timestamp or latest_timestamp > last_seen_timestamp: last_seen_timestamp = latest_timestamp except Exception as error: print(f"Error polling notifications: {error}") import traceback traceback.print_exc() # Try to re-authenticate on error if "expired" in str(error).lower() or "unauthorized" in str(error).lower() or "auth" in str(error).lower(): print("Session expired, re-authenticating...") bluesky_client.login(login=BLUESKY_HANDLE, password=BLUESKY_PASSWORD) # Initial poll (wait a bit to avoid processing old notifications) print("Waiting 2 seconds before initial poll...") time.sleep(2) poll_notifications() # Continue polling while True: time.sleep(10) poll_notifications() except KeyboardInterrupt: print("\nShutting down gracefully...") except Exception as error: print(f"Fatal error: {error}") import traceback traceback.print_exc() raise if __name__ == "__main__": main()