a digital person for bluesky
at main 159 lines 6.2 kB view raw
1"""Generic ATProto record retrieval tool.""" 2from typing import Optional 3from pydantic import BaseModel, Field 4 5 6class GetRecordArgs(BaseModel): 7 uri: Optional[str] = Field( 8 None, 9 description="Full AT URI (e.g., 'at://did:plc:xyz/app.bsky.feed.post/abc123'). If provided, repo/collection/rkey are ignored." 10 ) 11 repo: Optional[str] = Field( 12 None, 13 description="DID or handle of the repo (e.g., 'did:plc:xyz' or 'user.bsky.social')" 14 ) 15 collection: Optional[str] = Field( 16 None, 17 description="NSID of the collection (e.g., 'app.bsky.feed.post', 'app.bsky.actor.profile', 'app.bsky.graph.follow')" 18 ) 19 rkey: Optional[str] = Field( 20 None, 21 description="Record key within the collection" 22 ) 23 24 25def get_atproto_record( 26 uri: str = None, 27 repo: str = None, 28 collection: str = None, 29 rkey: str = None 30) -> str: 31 """ 32 Retrieve any ATProto record by URI or by repo/collection/rkey components. 33 34 This is a generic tool for fetching records from the AT Protocol network. 35 Common collections include: 36 - app.bsky.feed.post - Posts 37 - app.bsky.feed.like - Likes 38 - app.bsky.feed.repost - Reposts 39 - app.bsky.actor.profile - Profile records 40 - app.bsky.graph.follow - Follow records 41 - app.bsky.graph.block - Block records 42 - app.bsky.graph.list - List records 43 - app.bsky.graph.listitem - List item records 44 - app.bsky.feed.generator - Feed generator records 45 - app.bsky.labeler.service - Labeler service records 46 47 Args: 48 uri: Full AT URI (e.g., 'at://did:plc:xyz/app.bsky.feed.post/abc123'). 49 If provided, repo/collection/rkey are ignored. 50 repo: DID or handle of the repo (e.g., 'did:plc:xyz' or 'user.bsky.social') 51 collection: NSID of the collection (e.g., 'app.bsky.feed.post') 52 rkey: Record key within the collection 53 54 Returns: 55 YAML-formatted record data including the record value and metadata 56 """ 57 import os 58 import re 59 import yaml 60 import requests 61 62 try: 63 # Parse URI if provided 64 if uri: 65 # AT URI format: at://did:plc:xyz/collection/rkey 66 # or at://handle/collection/rkey 67 match = re.match(r'^at://([^/]+)/([^/]+)/(.+)$', uri) 68 if not match: 69 raise Exception(f"Invalid AT URI format: {uri}. Expected format: at://repo/collection/rkey") 70 repo = match.group(1) 71 collection = match.group(2) 72 rkey = match.group(3) 73 74 # Validate we have all required params 75 if not repo: 76 raise Exception("repo is required (either via uri or repo parameter)") 77 if not collection: 78 raise Exception("collection is required (either via uri or collection parameter)") 79 if not rkey: 80 raise Exception("rkey is required (either via uri or rkey parameter)") 81 82 # Use public API endpoint (no auth required for public records) 83 # Try the public API first, fall back to authenticated if needed 84 public_url = "https://public.api.bsky.app/xrpc/com.atproto.repo.getRecord" 85 params = { 86 "repo": repo, 87 "collection": collection, 88 "rkey": rkey 89 } 90 91 try: 92 response = requests.get(public_url, params=params, timeout=15) 93 94 # If public API fails with auth error, try authenticated 95 if response.status_code == 401 or response.status_code == 403: 96 raise Exception("Auth required") 97 98 response.raise_for_status() 99 record_data = response.json() 100 101 except Exception as public_error: 102 # Fall back to authenticated request 103 username = os.getenv("BSKY_USERNAME") 104 password = os.getenv("BSKY_PASSWORD") 105 pds_host = os.getenv("PDS_URI", "https://bsky.social") 106 107 if not username or not password: 108 raise Exception(f"Public API failed and no credentials available: {str(public_error)}") 109 110 # Create session 111 session_url = f"{pds_host}/xrpc/com.atproto.server.createSession" 112 session_data = { 113 "identifier": username, 114 "password": password 115 } 116 117 session_response = requests.post(session_url, json=session_data, timeout=10) 118 session_response.raise_for_status() 119 session = session_response.json() 120 access_token = session.get("accessJwt") 121 122 if not access_token: 123 raise Exception("Failed to get access token from session") 124 125 # Fetch record with auth 126 headers = {"Authorization": f"Bearer {access_token}"} 127 auth_url = f"{pds_host}/xrpc/com.atproto.repo.getRecord" 128 129 response = requests.get(auth_url, headers=headers, params=params, timeout=15) 130 response.raise_for_status() 131 record_data = response.json() 132 133 # Format output 134 result = { 135 "record": { 136 "uri": record_data.get("uri", f"at://{repo}/{collection}/{rkey}"), 137 "cid": record_data.get("cid", ""), 138 "collection": collection, 139 "repo": repo, 140 "rkey": rkey, 141 "value": record_data.get("value", {}) 142 } 143 } 144 145 return yaml.dump(result, default_flow_style=False, sort_keys=False, allow_unicode=True) 146 147 except requests.exceptions.HTTPError as e: 148 if e.response is not None: 149 if e.response.status_code == 404: 150 raise Exception(f"Record not found: at://{repo}/{collection}/{rkey}") 151 try: 152 error_body = e.response.json() 153 error_msg = error_body.get("message", str(e)) 154 except: 155 error_msg = str(e) 156 raise Exception(f"HTTP error fetching record: {error_msg}") 157 raise Exception(f"HTTP error fetching record: {str(e)}") 158 except Exception as e: 159 raise Exception(f"Error fetching ATProto record: {str(e)}")