Alternative ATProto PDS implementation
6
fork

Configure Feed

Select the types of activity you want to include in your feed.

at 66e42ce395bf2d6dcee6d67d0260fe92b912c055 245 lines 8.9 kB view raw
1//! Identity endpoints (/xrpc/com.atproto.identity.*) 2use std::collections::HashMap; 3 4use anyhow::{Context as _, anyhow}; 5use atrium_api::{ 6 com::atproto::identity, 7 types::string::{Datetime, Handle}, 8}; 9use atrium_crypto::keypair::Did as _; 10use atrium_repo::blockstore::{AsyncBlockStoreWrite as _, CarStore, DAG_CBOR, SHA2_256}; 11use axum::{ 12 Json, Router, 13 extract::{Query, State}, 14 http::StatusCode, 15 routing::{get, post}, 16}; 17use constcat::concat; 18 19use crate::{ 20 AppState, Client, Db, Error, Result, RotationKey, SigningKey, 21 auth::AuthenticatedUser, 22 config::AppConfig, 23 did, 24 firehose::FirehoseProducer, 25 plc::{self, PlcOperation, PlcService}, 26}; 27 28/// (GET) Resolves an atproto handle (hostname) to a DID. Does not necessarily bi-directionally verify against the the DID document. 29/// ### Query Parameters 30/// - handle: The handle to resolve. 31/// ### Responses 32/// - 200 OK: {did: did} 33/// - 400 Bad Request: {error:[`InvalidRequest`, `ExpiredToken`, `InvalidToken`, `HandleNotFound`]} 34/// - 401 Unauthorized 35async fn resolve_handle( 36 State(db): State<Db>, 37 State(client): State<Client>, 38 Query(input): Query<identity::resolve_handle::ParametersData>, 39) -> Result<Json<identity::resolve_handle::Output>> { 40 let handle = input.handle.as_str(); 41 if let Ok(did) = sqlx::query_scalar!(r#"SELECT did FROM handles WHERE handle = ?"#, handle) 42 .fetch_one(&db) 43 .await 44 { 45 return Ok(Json( 46 identity::resolve_handle::OutputData { 47 did: atrium_api::types::string::Did::new(did).expect("should be valid DID format"), 48 } 49 .into(), 50 )); 51 } 52 53 // HACK: Query bsky to see if they have this handle cached. 54 let response = client 55 .get(format!( 56 "https://api.bsky.app/xrpc/com.atproto.identity.resolveHandle?handle={handle}" 57 )) 58 .send() 59 .await 60 .context("failed to query upstream server")? 61 .json() 62 .await 63 .context("failed to decode response as JSON")?; 64 65 Ok(Json(response)) 66} 67 68#[expect(unused_variables, clippy::todo, reason = "Not yet implemented")] 69/// Request an email with a code to in order to request a signed PLC operation. Requires Auth. 70/// - POST /xrpc/com.atproto.identity.requestPlcOperationSignature 71/// ### Responses 72/// - 200 OK 73/// - 400 Bad Request: {error:[`InvalidRequest`, `ExpiredToken`, `InvalidToken`]} 74/// - 401 Unauthorized 75async fn request_plc_operation_signature(user: AuthenticatedUser) -> Result<()> { 76 todo!() 77} 78 79#[expect(unused_variables, clippy::todo, reason = "Not yet implemented")] 80/// Signs a PLC operation to update some value(s) in the requesting DID's document. 81/// - POST /xrpc/com.atproto.identity.signPlcOperation 82/// ### Request Body 83/// - token: string // A token received through com.atproto.identity.requestPlcOperationSignature 84/// - rotationKeys: string[] 85/// - alsoKnownAs: string[] 86/// - verificationMethods: services 87/// ### Responses 88/// - 200 OK: {operation: string} 89/// - 400 Bad Request: {error:[`InvalidRequest`, `ExpiredToken`, `InvalidToken`]} 90/// - 401 Unauthorized 91async fn sign_plc_operation( 92 user: AuthenticatedUser, 93 State(skey): State<SigningKey>, 94 State(rkey): State<RotationKey>, 95 State(config): State<AppConfig>, 96 Json(input): Json<identity::sign_plc_operation::Input>, 97) -> Result<Json<identity::sign_plc_operation::Output>> { 98 todo!() 99} 100 101#[expect( 102 clippy::too_many_arguments, 103 reason = "Many parameters are required for this endpoint" 104)] 105/// Updates the current account's handle. Verifies handle validity, and updates did:plc document if necessary. Implemented by PDS, and requires auth. 106/// - POST /xrpc/com.atproto.identity.updateHandle 107/// ### Query Parameters 108/// - handle: handle // The new handle. 109/// ### Responses 110/// - 200 OK 111/// ## Errors 112/// - If the handle is already in use. 113/// - 400 Bad Request: {error:[`InvalidRequest`, `ExpiredToken`, `InvalidToken`]} 114/// - 401 Unauthorized 115/// ## Panics 116/// - If the handle is not valid. 117async fn update_handle( 118 user: AuthenticatedUser, 119 State(skey): State<SigningKey>, 120 State(rkey): State<RotationKey>, 121 State(client): State<Client>, 122 State(config): State<AppConfig>, 123 State(db): State<Db>, 124 State(fhp): State<FirehoseProducer>, 125 Json(input): Json<identity::update_handle::Input>, 126) -> Result<()> { 127 let handle = input.handle.as_str(); 128 let did_str = user.did(); 129 let did = atrium_api::types::string::Did::new(user.did()).expect("should be valid DID format"); 130 131 if let Some(existing_did) = 132 sqlx::query_scalar!(r#"SELECT did FROM handles WHERE handle = ?"#, handle) 133 .fetch_optional(&db) 134 .await 135 .context("failed to query did count")? 136 { 137 if existing_did != did_str { 138 return Err(Error::with_status( 139 StatusCode::BAD_REQUEST, 140 anyhow!("attempted to update handle to one that is already in use"), 141 )); 142 } 143 } 144 145 // Ensure the existing DID is resolvable. 146 // If not, we need to register the original handle. 147 let _did = did::resolve(&client, did.clone()) 148 .await 149 .with_context(|| format!("failed to resolve DID for {did_str}")) 150 .context("should be able to resolve DID")?; 151 152 let op = plc::sign_op( 153 &rkey, 154 PlcOperation { 155 typ: "plc_operation".to_owned(), 156 rotation_keys: vec![rkey.did()], 157 verification_methods: HashMap::from([("atproto".to_owned(), skey.did())]), 158 also_known_as: vec![input.handle.as_str().to_owned()], 159 services: HashMap::from([( 160 "atproto_pds".to_owned(), 161 PlcService::Pds { 162 endpoint: config.host_name.clone(), 163 }, 164 )]), 165 prev: Some( 166 sqlx::query_scalar!(r#"SELECT plc_root FROM accounts WHERE did = ?"#, did_str) 167 .fetch_one(&db) 168 .await 169 .context("failed to fetch user PLC root")?, 170 ), 171 }, 172 ) 173 .context("failed to sign plc op")?; 174 175 if !config.test { 176 plc::submit(&client, did.as_str(), &op) 177 .await 178 .context("failed to submit PLC operation")?; 179 } 180 181 // FIXME: Properly abstract these implementation details. 182 let did_hash = did_str 183 .strip_prefix("did:plc:") 184 .context("should be valid DID format")?; 185 let doc = tokio::fs::File::options() 186 .read(true) 187 .write(true) 188 .open(config.plc.path.join(format!("{did_hash}.car"))) 189 .await 190 .context("failed to open did doc")?; 191 192 let op_bytes = serde_ipld_dagcbor::to_vec(&op).context("failed to encode plc op")?; 193 194 let plc_cid = CarStore::open(doc) 195 .await 196 .context("failed to open did carstore")? 197 .write_block(DAG_CBOR, SHA2_256, &op_bytes) 198 .await 199 .context("failed to write genesis commit")?; 200 201 let cid_str = plc_cid.to_string(); 202 203 _ = sqlx::query!( 204 r#"UPDATE accounts SET plc_root = ? WHERE did = ?"#, 205 cid_str, 206 did_str 207 ) 208 .execute(&db) 209 .await 210 .context("failed to update account PLC root")?; 211 212 // Broadcast the identity event now that the new identity is resolvable on the public directory. 213 fhp.identity( 214 atrium_api::com::atproto::sync::subscribe_repos::IdentityData { 215 did: did.clone(), 216 handle: Some(Handle::new(handle.to_owned()).expect("should be valid handle")), 217 seq: 0, // Filled by firehose later. 218 time: Datetime::now(), 219 }, 220 ) 221 .await; 222 223 Ok(()) 224} 225 226async fn todo() -> Result<()> { 227 Err(Error::unimplemented(anyhow!("not implemented"))) 228} 229 230#[rustfmt::skip] 231/// Identity endpoints (/xrpc/com.atproto.identity.*) 232/// ### Routes 233/// - AP /xrpc/com.atproto.identity.updateHandle -> [`update_handle`] 234/// - AP /xrpc/com.atproto.identity.requestPlcOperationSignature -> [`request_plc_operation_signature`] 235/// - AP /xrpc/com.atproto.identity.signPlcOperation -> [`sign_plc_operation`] 236/// - UG /xrpc/com.atproto.identity.resolveHandle -> [`resolve_handle`] 237pub(super) fn routes() -> Router<AppState> { 238 Router::new() 239 .route(concat!("/", identity::get_recommended_did_credentials::NSID), get(todo)) 240 .route(concat!("/", identity::request_plc_operation_signature::NSID), post(request_plc_operation_signature)) 241 .route(concat!("/", identity::resolve_handle::NSID), get(resolve_handle)) 242 .route(concat!("/", identity::sign_plc_operation::NSID), post(sign_plc_operation)) 243 .route(concat!("/", identity::submit_plc_operation::NSID), post(todo)) 244 .route(concat!("/", identity::update_handle::NSID), post(update_handle)) 245}