Alternative ATProto PDS implementation
1//! Identity endpoints (/xrpc/com.atproto.identity.*)
2use std::collections::HashMap;
3
4use anyhow::{Context as _, anyhow};
5use atrium_api::{
6 com::atproto::identity,
7 types::string::{Datetime, Handle},
8};
9use atrium_crypto::keypair::Did as _;
10use atrium_repo::blockstore::{AsyncBlockStoreWrite as _, CarStore, DAG_CBOR, SHA2_256};
11use axum::{
12 Json, Router,
13 extract::{Query, State},
14 http::StatusCode,
15 routing::{get, post},
16};
17use constcat::concat;
18
19use crate::{
20 AppState, Client, Db, Error, Result, RotationKey, SigningKey,
21 auth::AuthenticatedUser,
22 config::AppConfig,
23 did,
24 firehose::FirehoseProducer,
25 plc::{self, PlcOperation, PlcService},
26};
27
28/// (GET) Resolves an atproto handle (hostname) to a DID. Does not necessarily bi-directionally verify against the the DID document.
29/// ### Query Parameters
30/// - handle: The handle to resolve.
31/// ### Responses
32/// - 200 OK: {did: did}
33/// - 400 Bad Request: {error:[`InvalidRequest`, `ExpiredToken`, `InvalidToken`, `HandleNotFound`]}
34/// - 401 Unauthorized
35async fn resolve_handle(
36 State(db): State<Db>,
37 State(client): State<Client>,
38 Query(input): Query<identity::resolve_handle::ParametersData>,
39) -> Result<Json<identity::resolve_handle::Output>> {
40 let handle = input.handle.as_str();
41 if let Ok(did) = sqlx::query_scalar!(r#"SELECT did FROM handles WHERE handle = ?"#, handle)
42 .fetch_one(&db)
43 .await
44 {
45 return Ok(Json(
46 identity::resolve_handle::OutputData {
47 did: atrium_api::types::string::Did::new(did).expect("should be valid DID format"),
48 }
49 .into(),
50 ));
51 }
52
53 // HACK: Query bsky to see if they have this handle cached.
54 let response = client
55 .get(format!(
56 "https://api.bsky.app/xrpc/com.atproto.identity.resolveHandle?handle={handle}"
57 ))
58 .send()
59 .await
60 .context("failed to query upstream server")?
61 .json()
62 .await
63 .context("failed to decode response as JSON")?;
64
65 Ok(Json(response))
66}
67
68#[expect(unused_variables, clippy::todo, reason = "Not yet implemented")]
69/// Request an email with a code to in order to request a signed PLC operation. Requires Auth.
70/// - POST /xrpc/com.atproto.identity.requestPlcOperationSignature
71/// ### Responses
72/// - 200 OK
73/// - 400 Bad Request: {error:[`InvalidRequest`, `ExpiredToken`, `InvalidToken`]}
74/// - 401 Unauthorized
75async fn request_plc_operation_signature(user: AuthenticatedUser) -> Result<()> {
76 todo!()
77}
78
79#[expect(unused_variables, clippy::todo, reason = "Not yet implemented")]
80/// Signs a PLC operation to update some value(s) in the requesting DID's document.
81/// - POST /xrpc/com.atproto.identity.signPlcOperation
82/// ### Request Body
83/// - token: string // A token received through com.atproto.identity.requestPlcOperationSignature
84/// - rotationKeys: string[]
85/// - alsoKnownAs: string[]
86/// - verificationMethods: services
87/// ### Responses
88/// - 200 OK: {operation: string}
89/// - 400 Bad Request: {error:[`InvalidRequest`, `ExpiredToken`, `InvalidToken`]}
90/// - 401 Unauthorized
91async fn sign_plc_operation(
92 user: AuthenticatedUser,
93 State(skey): State<SigningKey>,
94 State(rkey): State<RotationKey>,
95 State(config): State<AppConfig>,
96 Json(input): Json<identity::sign_plc_operation::Input>,
97) -> Result<Json<identity::sign_plc_operation::Output>> {
98 todo!()
99}
100
101#[expect(
102 clippy::too_many_arguments,
103 reason = "Many parameters are required for this endpoint"
104)]
105/// Updates the current account's handle. Verifies handle validity, and updates did:plc document if necessary. Implemented by PDS, and requires auth.
106/// - POST /xrpc/com.atproto.identity.updateHandle
107/// ### Query Parameters
108/// - handle: handle // The new handle.
109/// ### Responses
110/// - 200 OK
111/// ## Errors
112/// - If the handle is already in use.
113/// - 400 Bad Request: {error:[`InvalidRequest`, `ExpiredToken`, `InvalidToken`]}
114/// - 401 Unauthorized
115/// ## Panics
116/// - If the handle is not valid.
117async fn update_handle(
118 user: AuthenticatedUser,
119 State(skey): State<SigningKey>,
120 State(rkey): State<RotationKey>,
121 State(client): State<Client>,
122 State(config): State<AppConfig>,
123 State(db): State<Db>,
124 State(fhp): State<FirehoseProducer>,
125 Json(input): Json<identity::update_handle::Input>,
126) -> Result<()> {
127 let handle = input.handle.as_str();
128 let did_str = user.did();
129 let did = atrium_api::types::string::Did::new(user.did()).expect("should be valid DID format");
130
131 if let Some(existing_did) =
132 sqlx::query_scalar!(r#"SELECT did FROM handles WHERE handle = ?"#, handle)
133 .fetch_optional(&db)
134 .await
135 .context("failed to query did count")?
136 {
137 if existing_did != did_str {
138 return Err(Error::with_status(
139 StatusCode::BAD_REQUEST,
140 anyhow!("attempted to update handle to one that is already in use"),
141 ));
142 }
143 }
144
145 // Ensure the existing DID is resolvable.
146 // If not, we need to register the original handle.
147 let _did = did::resolve(&client, did.clone())
148 .await
149 .with_context(|| format!("failed to resolve DID for {did_str}"))
150 .context("should be able to resolve DID")?;
151
152 let op = plc::sign_op(
153 &rkey,
154 PlcOperation {
155 typ: "plc_operation".to_owned(),
156 rotation_keys: vec![rkey.did()],
157 verification_methods: HashMap::from([("atproto".to_owned(), skey.did())]),
158 also_known_as: vec![input.handle.as_str().to_owned()],
159 services: HashMap::from([(
160 "atproto_pds".to_owned(),
161 PlcService::Pds {
162 endpoint: config.host_name.clone(),
163 },
164 )]),
165 prev: Some(
166 sqlx::query_scalar!(r#"SELECT plc_root FROM accounts WHERE did = ?"#, did_str)
167 .fetch_one(&db)
168 .await
169 .context("failed to fetch user PLC root")?,
170 ),
171 },
172 )
173 .context("failed to sign plc op")?;
174
175 if !config.test {
176 plc::submit(&client, did.as_str(), &op)
177 .await
178 .context("failed to submit PLC operation")?;
179 }
180
181 // FIXME: Properly abstract these implementation details.
182 let did_hash = did_str
183 .strip_prefix("did:plc:")
184 .context("should be valid DID format")?;
185 let doc = tokio::fs::File::options()
186 .read(true)
187 .write(true)
188 .open(config.plc.path.join(format!("{did_hash}.car")))
189 .await
190 .context("failed to open did doc")?;
191
192 let op_bytes = serde_ipld_dagcbor::to_vec(&op).context("failed to encode plc op")?;
193
194 let plc_cid = CarStore::open(doc)
195 .await
196 .context("failed to open did carstore")?
197 .write_block(DAG_CBOR, SHA2_256, &op_bytes)
198 .await
199 .context("failed to write genesis commit")?;
200
201 let cid_str = plc_cid.to_string();
202
203 _ = sqlx::query!(
204 r#"UPDATE accounts SET plc_root = ? WHERE did = ?"#,
205 cid_str,
206 did_str
207 )
208 .execute(&db)
209 .await
210 .context("failed to update account PLC root")?;
211
212 // Broadcast the identity event now that the new identity is resolvable on the public directory.
213 fhp.identity(
214 atrium_api::com::atproto::sync::subscribe_repos::IdentityData {
215 did: did.clone(),
216 handle: Some(Handle::new(handle.to_owned()).expect("should be valid handle")),
217 seq: 0, // Filled by firehose later.
218 time: Datetime::now(),
219 },
220 )
221 .await;
222
223 Ok(())
224}
225
226async fn todo() -> Result<()> {
227 Err(Error::unimplemented(anyhow!("not implemented")))
228}
229
230#[rustfmt::skip]
231/// Identity endpoints (/xrpc/com.atproto.identity.*)
232/// ### Routes
233/// - AP /xrpc/com.atproto.identity.updateHandle -> [`update_handle`]
234/// - AP /xrpc/com.atproto.identity.requestPlcOperationSignature -> [`request_plc_operation_signature`]
235/// - AP /xrpc/com.atproto.identity.signPlcOperation -> [`sign_plc_operation`]
236/// - UG /xrpc/com.atproto.identity.resolveHandle -> [`resolve_handle`]
237pub(super) fn routes() -> Router<AppState> {
238 Router::new()
239 .route(concat!("/", identity::get_recommended_did_credentials::NSID), get(todo))
240 .route(concat!("/", identity::request_plc_operation_signature::NSID), post(request_plc_operation_signature))
241 .route(concat!("/", identity::resolve_handle::NSID), get(resolve_handle))
242 .route(concat!("/", identity::sign_plc_operation::NSID), post(sign_plc_operation))
243 .route(concat!("/", identity::submit_plc_operation::NSID), post(todo))
244 .route(concat!("/", identity::update_handle::NSID), post(update_handle))
245}