a digital person for bluesky
1"""Generic ATProto record retrieval tool."""
2from typing import Optional
3from pydantic import BaseModel, Field
4
5
6class GetRecordArgs(BaseModel):
7 uri: Optional[str] = Field(
8 None,
9 description="Full AT URI (e.g., 'at://did:plc:xyz/app.bsky.feed.post/abc123'). If provided, repo/collection/rkey are ignored."
10 )
11 repo: Optional[str] = Field(
12 None,
13 description="DID or handle of the repo (e.g., 'did:plc:xyz' or 'user.bsky.social')"
14 )
15 collection: Optional[str] = Field(
16 None,
17 description="NSID of the collection (e.g., 'app.bsky.feed.post', 'app.bsky.actor.profile', 'app.bsky.graph.follow')"
18 )
19 rkey: Optional[str] = Field(
20 None,
21 description="Record key within the collection"
22 )
23
24
25def get_atproto_record(
26 uri: str = None,
27 repo: str = None,
28 collection: str = None,
29 rkey: str = None
30) -> str:
31 """
32 Retrieve any ATProto record by URI or by repo/collection/rkey components.
33
34 This is a generic tool for fetching records from the AT Protocol network.
35 Common collections include:
36 - app.bsky.feed.post - Posts
37 - app.bsky.feed.like - Likes
38 - app.bsky.feed.repost - Reposts
39 - app.bsky.actor.profile - Profile records
40 - app.bsky.graph.follow - Follow records
41 - app.bsky.graph.block - Block records
42 - app.bsky.graph.list - List records
43 - app.bsky.graph.listitem - List item records
44 - app.bsky.feed.generator - Feed generator records
45 - app.bsky.labeler.service - Labeler service records
46
47 Args:
48 uri: Full AT URI (e.g., 'at://did:plc:xyz/app.bsky.feed.post/abc123').
49 If provided, repo/collection/rkey are ignored.
50 repo: DID or handle of the repo (e.g., 'did:plc:xyz' or 'user.bsky.social')
51 collection: NSID of the collection (e.g., 'app.bsky.feed.post')
52 rkey: Record key within the collection
53
54 Returns:
55 YAML-formatted record data including the record value and metadata
56 """
57 import os
58 import re
59 import yaml
60 import requests
61
62 try:
63 # Parse URI if provided
64 if uri:
65 # AT URI format: at://did:plc:xyz/collection/rkey
66 # or at://handle/collection/rkey
67 match = re.match(r'^at://([^/]+)/([^/]+)/(.+)$', uri)
68 if not match:
69 raise Exception(f"Invalid AT URI format: {uri}. Expected format: at://repo/collection/rkey")
70 repo = match.group(1)
71 collection = match.group(2)
72 rkey = match.group(3)
73
74 # Validate we have all required params
75 if not repo:
76 raise Exception("repo is required (either via uri or repo parameter)")
77 if not collection:
78 raise Exception("collection is required (either via uri or collection parameter)")
79 if not rkey:
80 raise Exception("rkey is required (either via uri or rkey parameter)")
81
82 # Use public API endpoint (no auth required for public records)
83 # Try the public API first, fall back to authenticated if needed
84 public_url = "https://public.api.bsky.app/xrpc/com.atproto.repo.getRecord"
85 params = {
86 "repo": repo,
87 "collection": collection,
88 "rkey": rkey
89 }
90
91 try:
92 response = requests.get(public_url, params=params, timeout=15)
93
94 # If public API fails with auth error, try authenticated
95 if response.status_code == 401 or response.status_code == 403:
96 raise Exception("Auth required")
97
98 response.raise_for_status()
99 record_data = response.json()
100
101 except Exception as public_error:
102 # Fall back to authenticated request
103 username = os.getenv("BSKY_USERNAME")
104 password = os.getenv("BSKY_PASSWORD")
105 pds_host = os.getenv("PDS_URI", "https://bsky.social")
106
107 if not username or not password:
108 raise Exception(f"Public API failed and no credentials available: {str(public_error)}")
109
110 # Create session
111 session_url = f"{pds_host}/xrpc/com.atproto.server.createSession"
112 session_data = {
113 "identifier": username,
114 "password": password
115 }
116
117 session_response = requests.post(session_url, json=session_data, timeout=10)
118 session_response.raise_for_status()
119 session = session_response.json()
120 access_token = session.get("accessJwt")
121
122 if not access_token:
123 raise Exception("Failed to get access token from session")
124
125 # Fetch record with auth
126 headers = {"Authorization": f"Bearer {access_token}"}
127 auth_url = f"{pds_host}/xrpc/com.atproto.repo.getRecord"
128
129 response = requests.get(auth_url, headers=headers, params=params, timeout=15)
130 response.raise_for_status()
131 record_data = response.json()
132
133 # Format output
134 result = {
135 "record": {
136 "uri": record_data.get("uri", f"at://{repo}/{collection}/{rkey}"),
137 "cid": record_data.get("cid", ""),
138 "collection": collection,
139 "repo": repo,
140 "rkey": rkey,
141 "value": record_data.get("value", {})
142 }
143 }
144
145 return yaml.dump(result, default_flow_style=False, sort_keys=False, allow_unicode=True)
146
147 except requests.exceptions.HTTPError as e:
148 if e.response is not None:
149 if e.response.status_code == 404:
150 raise Exception(f"Record not found: at://{repo}/{collection}/{rkey}")
151 try:
152 error_body = e.response.json()
153 error_msg = error_body.get("message", str(e))
154 except:
155 error_msg = str(e)
156 raise Exception(f"HTTP error fetching record: {error_msg}")
157 raise Exception(f"HTTP error fetching record: {str(e)}")
158 except Exception as e:
159 raise Exception(f"Error fetching ATProto record: {str(e)}")