Microservice to bring 2FA to self hosted PDSes

createAccount stricter limits and more #3

merged opened by baileytownsend.dev targeting main from feature/CreateAccountStricterRateLimits
  • added rate limiter on the com.atproto.servce.createAccount end point so you can set stricter limits
  • having a /pds/pds.env is no longer a hard requirment
  • fix a bug on com.atproto.server.getSession not returning on oauth logins
  • bit more logging on errors
Labels

None yet.

Participants 1
AT URI
at://did:plc:rnpkyqnmsw4ipey6eotbdnnf/sh.tangled.repo.pull/3ly5ciyc5fx22
+240 -100
Diff #0
+2 -3
Cargo.toml
··· 19 19 hex = "0.4" 20 20 jwt-compact = { version = "0.8.0", features = ["es256k"] } 21 21 scrypt = "0.11" 22 - #lettre = { version = "0.11.18", default-features = false, features = ["pool", "tokio1-rustls", "smtp-transport", "hostname", "builder"] } 23 - #lettre = { version = "0.11", default-features = false, features = ["builder", "webpki-roots", "rustls", "aws-lc-rs", "smtp-transport", "tokio1", "tokio1-rustls"] } 22 + #Leaveing these two cause I think it is needed by the 24 23 aws-lc-rs = "1.13.0" 25 - lettre = { version = "0.11", default-features = false, features = ["builder", "webpki-roots", "rustls", "aws-lc-rs", "smtp-transport", "tokio1", "tokio1-rustls"] } 26 24 rustls = { version = "0.23", default-features = false, features = ["tls12", "std", "logging", "aws_lc_rs"] } 25 + lettre = { version = "0.11", default-features = false, features = ["builder", "webpki-roots", "rustls", "aws-lc-rs", "smtp-transport", "tokio1", "tokio1-rustls"] } 27 26 handlebars = { version = "6.3.2", features = ["rust-embed"] } 28 27 rust-embed = "8.7.2" 29 28 axum-template = { version = "3.0.0", features = ["handlebars"] }
+74 -40
src/middleware.rs
··· 12 12 #[derive(Clone, Debug)] 13 13 pub struct Did(pub Option<String>); 14 14 15 + #[derive(Clone, Copy, Debug, PartialEq, Eq)] 16 + pub enum AuthScheme { 17 + Bearer, 18 + DPoP, 19 + } 20 + 15 21 #[derive(Serialize, Deserialize)] 16 22 pub struct TokenClaims { 17 23 pub sub: String, 18 24 } 19 25 20 26 pub async fn extract_did(mut req: Request, next: Next) -> impl IntoResponse { 21 - let token = extract_bearer(req.headers()); 27 + let auth = extract_auth(req.headers()); 22 28 23 - match token { 24 - Ok(token) => { 25 - match token { 29 + match auth { 30 + Ok(auth_opt) => { 31 + match auth_opt { 26 32 None => json_error_response(StatusCode::BAD_REQUEST, "TokenRequired", "") 27 33 .expect("Error creating an error response"), 28 - Some(token) => { 29 - let token = UntrustedToken::new(&token); 30 - if token.is_err() { 31 - return json_error_response(StatusCode::BAD_REQUEST, "TokenRequired", "") 32 - .expect("Error creating an error response"); 33 - 34 - 35 - 36 - 37 - 38 - 39 - 40 - 41 - 42 - 43 - 44 - 45 - 46 - 47 - 48 - 49 - .expect("Error creating an error response"); 34 + Some((scheme, token_str)) => { 35 + // For Bearer, validate JWT and extract DID from `sub`. 36 + // For DPoP, we currently only pass through and do not validate here; insert None DID. 37 + match scheme { 38 + AuthScheme::Bearer => { 39 + let token = UntrustedToken::new(&token_str); 40 + if token.is_err() { 41 + return json_error_response( 42 + StatusCode::BAD_REQUEST, 43 + "TokenRequired", 44 + "", 45 + ) 46 + .expect("Error creating an error response"); 47 + } 48 + let parsed_token = token.expect("Already checked for error"); 49 + let claims: Result<Claims<TokenClaims>, ValidationError> = 50 + parsed_token.deserialize_claims_unchecked(); 51 + if claims.is_err() { 52 + return json_error_response( 53 + StatusCode::BAD_REQUEST, 54 + "TokenRequired", 55 + "", 56 + ) 57 + .expect("Error creating an error response"); 58 + } 59 + 60 + let key = Hs256Key::new( 61 + env::var("PDS_JWT_SECRET") 62 + .expect("PDS_JWT_SECRET not set in the pds.env"), 63 + ); 64 + let token: Result<Token<TokenClaims>, ValidationError> = 65 + Hs256.validator(&key).validate(&parsed_token); 66 + if token.is_err() { 67 + return json_error_response( 68 + StatusCode::BAD_REQUEST, 69 + "InvalidToken", 70 + "", 71 + ) 72 + .expect("Error creating an error response"); 73 + } 74 + let token = token.expect("Already checked for error,"); 75 + req.extensions_mut() 76 + .insert(Did(Some(token.claims().custom.sub.clone()))); 77 + } 78 + AuthScheme::DPoP => { 79 + //Not going to worry about oauth email update for now, just always forward to the PDS 80 + req.extensions_mut().insert(Did(None)); 81 + } 50 82 } 51 - let token = token.expect("Already checked for error,"); 52 - //Not going to worry about expiration since it still goes to the PDS 53 - req.extensions_mut() 54 - .insert(Did(Some(token.claims().custom.sub.clone()))); 83 + 55 84 next.run(req).await 56 85 } 57 86 } ··· 64 93 } 65 94 } 66 95 67 - fn extract_bearer(headers: &HeaderMap) -> Result<Option<String>, String> { 96 + fn extract_auth(headers: &HeaderMap) -> Result<Option<(AuthScheme, String)>, String> { 68 97 match headers.get(axum::http::header::AUTHORIZATION) { 69 98 None => Ok(None), 70 - Some(hv) => match hv.to_str() { 71 - Err(_) => Err("Authorization header is not valid".into()), 72 - Ok(s) => { 73 - // Accept forms like: "Bearer <token>" (case-sensitive for the scheme here) 74 - let mut parts = s.splitn(2, ' '); 75 - match (parts.next(), parts.next()) { 76 - (Some("Bearer"), Some(tok)) if !tok.is_empty() => Ok(Some(tok.to_string())), 77 - _ => Err("Authorization header must be in format 'Bearer <token>'".into()), 99 + Some(hv) => { 100 + match hv.to_str() { 101 + Err(_) => Err("Authorization header is not valid".into()), 102 + Ok(s) => { 103 + // Accept forms like: "Bearer <token>" or "DPoP <token>" (case-sensitive for the scheme here) 104 + let mut parts = s.splitn(2, ' '); 105 + match (parts.next(), parts.next()) { 106 + (Some("Bearer"), Some(tok)) if !tok.is_empty() => 107 + Ok(Some((AuthScheme::Bearer, tok.to_string()))), 108 + (Some("DPoP"), Some(tok)) if !tok.is_empty() => 109 + Ok(Some((AuthScheme::DPoP, tok.to_string()))), 110 + _ => Err("Authorization header must be in format 'Bearer <token>' or 'DPoP <token>'".into()), 111 + } 78 112 } 79 113 } 80 - }, 114 + } 81 115 } 82 116 }
+3 -3
Cargo.lock
··· 656 656 checksum = "778e2ac28f6c47af28e4907f13ffd1e1ddbd400980a9abd7c8df189bf578a5ad" 657 657 dependencies = [ 658 658 "libc", 659 - "windows-sys 0.52.0", 659 + "windows-sys 0.59.0", 660 660 ] 661 661 662 662 [[package]] ··· 1392 1392 checksum = "07033963ba89ebaf1584d767badaa2e8fcec21aedea6b8c0346d487d49c28667" 1393 1393 dependencies = [ 1394 1394 "cfg-if", 1395 - "windows-targets 0.48.5", 1395 + "windows-targets 0.52.6", 1396 1396 ] 1397 1397 1398 1398 [[package]] ··· 2136 2136 "errno", 2137 2137 "libc", 2138 2138 "linux-raw-sys", 2139 - "windows-sys 0.52.0", 2139 + "windows-sys 0.59.0", 2140 2140 ] 2141 2141 2142 2142 [[package]]
+153 -54
src/xrpc/com_atproto_server.rs
··· 7 7 8 8 9 9 10 + use axum::{Extension, Json, debug_handler, extract, extract::Request}; 11 + use serde::{Deserialize, Serialize}; 12 + use serde_json; 13 + use tracing::log; 10 14 11 - 12 - 13 - 14 - 15 - 15 + #[derive(Serialize, Deserialize, Debug, Clone)] 16 16 17 17 18 18 ··· 147 147 //If email auth is set it is to either turn on or off 2fa 148 148 let email_auth_update = payload.email_auth_factor.unwrap_or(false); 149 149 150 - // Email update asked for 151 - if email_auth_update { 152 - let email = payload.email.clone(); 153 - let email_confirmed = sqlx::query_as::<_, (String,)>( 154 - "SELECT did FROM account WHERE emailConfirmedAt IS NOT NULL AND email = ?", 155 - ) 156 - .bind(&email) 157 - .fetch_optional(&state.account_pool) 158 - .await 159 - .map_err(|_| StatusCode::BAD_REQUEST)?; 160 - 161 - //Since the email is already confirmed we can enable 2fa 162 - return match email_confirmed { 163 - None => Err(StatusCode::BAD_REQUEST), 164 - Some(did_row) => { 165 - let _ = sqlx::query( 166 - "INSERT INTO two_factor_accounts (did, required) VALUES (?, 1) ON CONFLICT(did) DO UPDATE SET required = 1", 167 - ) 168 - .bind(&did_row.0) 169 - .execute(&state.pds_gatekeeper_pool) 170 - .await 171 - .map_err(|_| StatusCode::BAD_REQUEST)?; 172 - 173 - Ok(StatusCode::OK.into_response()) 174 - } 175 - }; 176 - } 177 - 178 - // User wants auth turned off 179 - if !email_auth_update && !email_auth_not_set { 180 - //User wants auth turned off and has a token 181 - if let Some(token) = &payload.token { 182 - let token_found = sqlx::query_as::<_, (String,)>( 183 - "SELECT token FROM email_token WHERE token = ? AND did = ? AND purpose = 'update_email'", 150 + //This means the middleware successfully extracted a did from the request, if not it just needs to be forward to the PDS 151 + //This is also empty if it is an oauth request, which is not supported by gatekeeper turning on 2fa since the dpop stuff needs to be implemented 152 + let did_is_not_empty = did.0.is_some(); 153 + 154 + if did_is_not_empty { 155 + // Email update asked for 156 + if email_auth_update { 157 + let email = payload.email.clone(); 158 + let email_confirmed = match sqlx::query_as::<_, (String,)>( 159 + "SELECT did FROM account WHERE emailConfirmedAt IS NOT NULL AND email = ?", 184 160 ) 185 - .bind(token) 186 - .bind(&did.0) 161 + .bind(&email) 187 162 .fetch_optional(&state.account_pool) 188 163 .await 189 - .map_err(|_| StatusCode::BAD_REQUEST)?; 164 + { 165 + Ok(row) => row, 166 + Err(err) => { 167 + log::error!("Error checking if email is confirmed: {err}"); 168 + return Err(StatusCode::BAD_REQUEST); 169 + } 170 + }; 171 + 172 + //Since the email is already confirmed we can enable 2fa 173 + return match email_confirmed { 174 + None => Err(StatusCode::BAD_REQUEST), 175 + Some(did_row) => { 176 + let _ = sqlx::query( 177 + "INSERT INTO two_factor_accounts (did, required) VALUES (?, 1) ON CONFLICT(did) DO UPDATE SET required = 1", 178 + ) 179 + .bind(&did_row.0) 180 + .execute(&state.pds_gatekeeper_pool) 181 + .await 182 + .map_err(|_| StatusCode::BAD_REQUEST)?; 183 + 184 + Ok(StatusCode::OK.into_response()) 185 + } 186 + }; 187 + } 190 188 191 - if token_found.is_some() { 192 - let _ = sqlx::query( 193 - "INSERT INTO two_factor_accounts (did, required) VALUES (?, 0) ON CONFLICT(did) DO UPDATE SET required = 0", 189 + // User wants auth turned off 190 + if !email_auth_update && !email_auth_not_set { 191 + //User wants auth turned off and has a token 192 + if let Some(token) = &payload.token { 193 + let token_found = match sqlx::query_as::<_, (String,)>( 194 + "SELECT token FROM email_token WHERE token = ? AND did = ? AND purpose = 'update_email'", 194 195 ) 195 - .bind(&did.0) 196 - .execute(&state.pds_gatekeeper_pool) 197 - .await 198 - .map_err(|_| StatusCode::BAD_REQUEST)?; 199 - 200 - return Ok(StatusCode::OK.into_response()); 201 - } else { 202 - return Err(StatusCode::BAD_REQUEST); 196 + .bind(token) 197 + .bind(&did.0) 198 + .fetch_optional(&state.account_pool) 199 + .await{ 200 + Ok(token) => token, 201 + Err(err) => { 202 + log::error!("Error checking if token is valid: {err}"); 203 + return Err(StatusCode::BAD_REQUEST); 204 + } 205 + }; 206 + 207 + return if token_found.is_some() { 208 + //TODO I think there may be a bug here and need to do some retry logic 209 + // First try was erroring, seconds was allowing 210 + match sqlx::query( 211 + "INSERT INTO two_factor_accounts (did, required) VALUES (?, 0) ON CONFLICT(did) DO UPDATE SET required = 0", 212 + ) 213 + .bind(&did.0) 214 + .execute(&state.pds_gatekeeper_pool) 215 + .await { 216 + Ok(_) => {} 217 + Err(err) => { 218 + log::error!("Error updating email auth: {err}"); 219 + return Err(StatusCode::BAD_REQUEST); 220 + } 221 + } 222 + 223 + Ok(StatusCode::OK.into_response()) 224 + } else { 225 + Err(StatusCode::BAD_REQUEST) 226 + }; 203 227 } 204 228 } 205 229 } 206 - 207 230 // Updating the actual email address by sending it on to the PDS 208 231 let uri = format!( 209 232 "{}{}", 233 + 234 + 235 + 236 + 237 + 238 + 239 + 240 + 241 + 242 + 243 + 244 + 245 + 246 + 247 + 248 + 249 + 250 + 251 + 252 + 253 + 254 + 255 + 256 + 257 + 258 + 259 + 260 + 261 + 262 + 263 + 264 + 265 + 266 + 267 + 268 + 269 + 270 + 271 + 272 + 273 + 274 + 275 + 276 + 277 + 278 + 279 + 280 + 281 + 282 + ProxiedResult::Passthrough(resp) => Ok(resp), 283 + } 284 + } 285 + 286 + pub async fn create_account( 287 + State(state): State<AppState>, 288 + mut req: Request, 289 + ) -> Result<Response<Body>, StatusCode> { 290 + //TODO if I add the block of only accounts authenticated just take the body as json here and grab the lxm token. No middle ware is needed 291 + 292 + let uri = format!( 293 + "{}{}", 294 + state.pds_base_url, "/xrpc/com.atproto.server.createAccount" 295 + ); 296 + 297 + // Rewrite the URI to point at the upstream PDS; keep headers, method, and body intact 298 + *req.uri_mut() = uri.parse().map_err(|_| StatusCode::BAD_REQUEST)?; 299 + 300 + let proxied = state 301 + .reverse_proxy_client 302 + .request(req) 303 + .await 304 + .map_err(|_| StatusCode::BAD_REQUEST)? 305 + .into_response(); 306 + 307 + Ok(proxied) 308 + }
+1
examples/Caddyfile
··· 14 14 path /xrpc/com.atproto.server.getSession 15 15 path /xrpc/com.atproto.server.updateEmail 16 16 path /xrpc/com.atproto.server.createSession 17 + path /xrpc/com.atproto.server.createAccount 17 18 path /@atproto/oauth-provider/~api/sign-in 18 19 } 19 20
+7
README.md
··· 113 113 `GATEKEEPER_HOST` - Host for pds gatekeeper. Defaults to `127.0.0.1` 114 114 115 115 `GATEKEEPER_PORT` - Port for pds gatekeeper. Defaults to `8080` 116 + 117 + `GATEKEEPER_CREATE_ACCOUNT_PER_SECOND` - Sets how often it takes a count off the limiter. example if you hit the rate 118 + limit of 5 and set to 60, then in 60 seconds you will be able to make one more. Or in 5 minutes be able to make 5 more. 119 + 120 + `GATEKEEPER_CREATE_ACCOUNT_BURST` - Sets how many requests can be made in a burst. In the prior example this is where 121 + the 5 comes from. Example can set this to 10 to allow for 10 requests in a burst, and after 60 seconds it will drop one 122 + off.

History

1 round 0 comments
sign up or login to add to the discussion
6 commits
expand
Works but I feel like it should be more secure
WIP
Create account rate limiting
logging
little more clean up
forgot some docs
expand 0 comments
pull request successfully merged