"""Generic ATProto record retrieval tool.""" from typing import Optional from pydantic import BaseModel, Field class GetRecordArgs(BaseModel): uri: Optional[str] = Field( None, description="Full AT URI (e.g., 'at://did:plc:xyz/app.bsky.feed.post/abc123'). If provided, repo/collection/rkey are ignored." ) repo: Optional[str] = Field( None, description="DID or handle of the repo (e.g., 'did:plc:xyz' or 'user.bsky.social')" ) collection: Optional[str] = Field( None, description="NSID of the collection (e.g., 'app.bsky.feed.post', 'app.bsky.actor.profile', 'app.bsky.graph.follow')" ) rkey: Optional[str] = Field( None, description="Record key within the collection" ) def get_atproto_record( uri: str = None, repo: str = None, collection: str = None, rkey: str = None ) -> str: """ Retrieve any ATProto record by URI or by repo/collection/rkey components. This is a generic tool for fetching records from the AT Protocol network. Common collections include: - app.bsky.feed.post - Posts - app.bsky.feed.like - Likes - app.bsky.feed.repost - Reposts - app.bsky.actor.profile - Profile records - app.bsky.graph.follow - Follow records - app.bsky.graph.block - Block records - app.bsky.graph.list - List records - app.bsky.graph.listitem - List item records - app.bsky.feed.generator - Feed generator records - app.bsky.labeler.service - Labeler service records Args: uri: Full AT URI (e.g., 'at://did:plc:xyz/app.bsky.feed.post/abc123'). If provided, repo/collection/rkey are ignored. repo: DID or handle of the repo (e.g., 'did:plc:xyz' or 'user.bsky.social') collection: NSID of the collection (e.g., 'app.bsky.feed.post') rkey: Record key within the collection Returns: YAML-formatted record data including the record value and metadata """ import os import re import yaml import requests try: # Parse URI if provided if uri: # AT URI format: at://did:plc:xyz/collection/rkey # or at://handle/collection/rkey match = re.match(r'^at://([^/]+)/([^/]+)/(.+)$', uri) if not match: raise Exception(f"Invalid AT URI format: {uri}. Expected format: at://repo/collection/rkey") repo = match.group(1) collection = match.group(2) rkey = match.group(3) # Validate we have all required params if not repo: raise Exception("repo is required (either via uri or repo parameter)") if not collection: raise Exception("collection is required (either via uri or collection parameter)") if not rkey: raise Exception("rkey is required (either via uri or rkey parameter)") # Use public API endpoint (no auth required for public records) # Try the public API first, fall back to authenticated if needed public_url = "https://public.api.bsky.app/xrpc/com.atproto.repo.getRecord" params = { "repo": repo, "collection": collection, "rkey": rkey } try: response = requests.get(public_url, params=params, timeout=15) # If public API fails with auth error, try authenticated if response.status_code == 401 or response.status_code == 403: raise Exception("Auth required") response.raise_for_status() record_data = response.json() except Exception as public_error: # Fall back to authenticated request username = os.getenv("BSKY_USERNAME") password = os.getenv("BSKY_PASSWORD") pds_host = os.getenv("PDS_URI", "https://bsky.social") if not username or not password: raise Exception(f"Public API failed and no credentials available: {str(public_error)}") # Create session session_url = f"{pds_host}/xrpc/com.atproto.server.createSession" session_data = { "identifier": username, "password": password } session_response = requests.post(session_url, json=session_data, timeout=10) session_response.raise_for_status() session = session_response.json() access_token = session.get("accessJwt") if not access_token: raise Exception("Failed to get access token from session") # Fetch record with auth headers = {"Authorization": f"Bearer {access_token}"} auth_url = f"{pds_host}/xrpc/com.atproto.repo.getRecord" response = requests.get(auth_url, headers=headers, params=params, timeout=15) response.raise_for_status() record_data = response.json() # Format output result = { "record": { "uri": record_data.get("uri", f"at://{repo}/{collection}/{rkey}"), "cid": record_data.get("cid", ""), "collection": collection, "repo": repo, "rkey": rkey, "value": record_data.get("value", {}) } } return yaml.dump(result, default_flow_style=False, sort_keys=False, allow_unicode=True) except requests.exceptions.HTTPError as e: if e.response is not None: if e.response.status_code == 404: raise Exception(f"Record not found: at://{repo}/{collection}/{rkey}") try: error_body = e.response.json() error_msg = error_body.get("message", str(e)) except: error_msg = str(e) raise Exception(f"HTTP error fetching record: {error_msg}") raise Exception(f"HTTP error fetching record: {str(e)}") except Exception as e: raise Exception(f"Error fetching ATProto record: {str(e)}")