Rust implementation of OCI Distribution Spec with granular access control
at main 527 lines 17 kB view raw
1// | ID | Method | API Endpoint | Success | Failure | 2// | ------ | -------------- | ------------------------------------------------------------ | ----------- | ----------------- | 3// | end-1 | `GET` | `/v2/` | `200` | `404`/`401` | 4// | end-2 | `GET` / `HEAD` | `/v2/<name>/blobs/<digest>` | `200` | `404` | 5// | end-4a | `POST` | `/v2/<name>/blobs/uploads/` | `202` | `404` | 6// | end-4b | `POST` | `/v2/<name>/blobs/uploads/?digest=<digest>` | `201`/`202` | `404`/`400` | 7// | end-5 | `PATCH` | `/v2/<name>/blobs/uploads/<reference>` | `202` | `404`/`416` | 8// | end-6 | `PUT` | `/v2/<name>/blobs/uploads/<reference>?digest=<digest>` | `201` | `404`/`400` | 9// | end-10 | `DELETE` | `/v2/<name>/blobs/<digest>` | `202` | `404`/`405` | 10// | end-11 | `POST` | `/v2/<name>/blobs/uploads/?mount=<digest>&from=<other_name>` | `201` | `404` | 11 12use serde::Deserialize; 13use std::sync::Arc; 14 15use crate::{ 16 auth, metrics, permissions, response, state, 17 storage::{self, write_blob}, 18}; 19use axum::{ 20 body::Body, 21 extract::{Path, Query, State}, 22 http::{HeaderMap, StatusCode}, 23 response::Response, 24}; 25use bytes::Bytes; 26 27// end-2 GET /v2/:name/blobs/:digest 28pub(crate) async fn get_blob_by_digest( 29 State(state): State<Arc<state::App>>, 30 Path((org, repo, digest_string)): Path<(String, String, String)>, 31 headers: HeaderMap, 32) -> Response<Body> { 33 log::info!( 34 "blobs/get_blob_by_digest: org: {}, repo {}, digest: {}", 35 org, 36 repo, 37 digest_string 38 ); 39 40 let host = &state.args.host; 41 let repository = format!("{}/{}", org, repo); 42 43 // Check permission (Pull for blob retrieval) 44 match auth::check_permission( 45 &state, 46 &headers, 47 &repository, 48 None, 49 permissions::Action::Pull, 50 ) 51 .await 52 { 53 Ok(_) => {} 54 Err(_) => { 55 return if auth::authenticate_user(&state, &headers).await.is_ok() { 56 response::forbidden() 57 } else { 58 response::unauthorized(host) 59 }; 60 } 61 } 62 63 // Strip sha256: prefix if present 64 let clean_digest = digest_string 65 .strip_prefix("sha256:") 66 .unwrap_or(&digest_string); 67 68 // Read blob from storage 69 match storage::read_blob(&org, &repo, clean_digest) { 70 Ok(blob_data) => { 71 metrics::BLOB_DOWNLOADS_TOTAL.inc(); 72 Response::builder() 73 .status(StatusCode::OK) 74 .header("Content-Length", blob_data.len().to_string()) 75 .header("Docker-Content-Digest", format!("sha256:{}", clean_digest)) 76 .header("Content-Type", "application/octet-stream") 77 .body(Body::from(blob_data)) 78 .unwrap() 79 } 80 Err(e) => { 81 log::warn!( 82 "blobs/get_blob_by_digest: blob not found: {}/{}/{}: {}", 83 org, 84 repo, 85 clean_digest, 86 e 87 ); 88 response::blob_unknown(&format!("sha256:{}", clean_digest)) 89 } 90 } 91} 92 93// end-2 HEAD /v2/:name/blobs/:digest 94pub(crate) async fn head_blob_by_digest( 95 State(state): State<Arc<state::App>>, 96 Path((org, repo, digest_string)): Path<(String, String, String)>, 97 headers: HeaderMap, 98) -> Response<Body> { 99 log::info!( 100 "blobs/head_blob_by_digest: org: {}, repo {}, digest: {}", 101 org, 102 repo, 103 digest_string 104 ); 105 106 let host = &state.args.host; 107 let repository = format!("{}/{}", org, repo); 108 109 // Check permission (Pull for blob retrieval) 110 match auth::check_permission( 111 &state, 112 &headers, 113 &repository, 114 None, 115 permissions::Action::Pull, 116 ) 117 .await 118 { 119 Ok(_) => {} 120 Err(_) => { 121 return if auth::authenticate_user(&state, &headers).await.is_ok() { 122 Response::builder() 123 .status(StatusCode::FORBIDDEN) 124 .body(Body::empty()) 125 .unwrap() 126 } else { 127 Response::builder() 128 .status(StatusCode::UNAUTHORIZED) 129 .header( 130 "WWW-Authenticate", 131 format!("Basic realm=\"{}\", charset=\"UTF-8\"", host), 132 ) 133 .body(Body::empty()) 134 .unwrap() 135 }; 136 } 137 } 138 139 // Strip sha256: prefix if present 140 let clean_digest = digest_string 141 .strip_prefix("sha256:") 142 .unwrap_or(&digest_string); 143 144 // Check if blob exists and get metadata 145 match storage::blob_metadata(&org, &repo, clean_digest) { 146 Ok(metadata) => Response::builder() 147 .status(StatusCode::OK) 148 .header("Content-Length", metadata.len().to_string()) 149 .header("Docker-Content-Digest", format!("sha256:{}", clean_digest)) 150 .header("Content-Type", "application/octet-stream") 151 .body(Body::empty()) 152 .unwrap(), 153 Err(e) => { 154 log::warn!( 155 "blobs/head_blob_by_digest: blob not found: {}/{}/{}: {}", 156 org, 157 repo, 158 clean_digest, 159 e 160 ); 161 response::blob_unknown(&format!("sha256:{}", clean_digest)) 162 } 163 } 164} 165 166// end-4a POST /v2/:name/blobs/uploads/ 167// end-4b POST /v2/:name/blobs/uploads/?digest=:digest 168// end-11 POST /v2/:name/blobs/uploads/?mount=:digest&from=:other_name 169#[derive(Deserialize)] 170pub(crate) struct PostBlobUploadQueryParams { 171 digest: Option<String>, 172 mount: Option<String>, 173 from: Option<String>, 174} 175 176pub(crate) async fn post_blob_upload( 177 State(state): State<Arc<state::App>>, 178 Path((org, repo)): Path<(String, String)>, 179 Query(params): Query<PostBlobUploadQueryParams>, 180 headers: HeaderMap, 181 body: Bytes, 182) -> Response<Body> { 183 log::info!("blobs/post_blob_upload: org: {}, repo: {}", org, repo); 184 185 let host = &state.args.host; 186 let repository = format!("{}/{}", org, repo); 187 188 // Check permission (Push for blob upload) 189 match auth::check_permission( 190 &state, 191 &headers, 192 &repository, 193 None, 194 permissions::Action::Push, 195 ) 196 .await 197 { 198 Ok(_) => {} 199 Err(_) => { 200 return if auth::authenticate_user(&state, &headers).await.is_ok() { 201 response::forbidden() 202 } else { 203 response::unauthorized(host) 204 }; 205 } 206 } 207 208 // Handle blob mounting (end-11) 209 if let (Some(mount_digest), Some(from_repo)) = (&params.mount, &params.from) { 210 let clean_digest = mount_digest.strip_prefix("sha256:").unwrap_or(mount_digest); 211 212 // Parse source repository (format: "org/repo") 213 let from_parts: Vec<&str> = from_repo.split('/').collect(); 214 if from_parts.len() == 2 { 215 let source_org = from_parts[0]; 216 let source_repo = from_parts[1]; 217 let source_repository = format!("{}/{}", source_org, source_repo); 218 219 // Check if user has pull permission on source repository 220 if auth::check_permission( 221 &state, 222 &headers, 223 &source_repository, 224 None, 225 permissions::Action::Pull, 226 ) 227 .await 228 .is_ok() 229 { 230 // Attempt to mount blob 231 match storage::mount_blob(source_org, source_repo, &org, &repo, clean_digest) { 232 Ok(()) => { 233 log::info!( 234 "Mounted blob {} from {} to {}", 235 clean_digest, 236 from_repo, 237 repository 238 ); 239 240 let location = format!( 241 "http://{}/v2/{}/{}/blobs/sha256:{}", 242 host, org, repo, clean_digest 243 ); 244 245 return Response::builder() 246 .status(StatusCode::CREATED) 247 .header("Location", location) 248 .header("Docker-Content-Digest", format!("sha256:{}", clean_digest)) 249 .body(Body::empty()) 250 .unwrap(); 251 } 252 Err(e) => { 253 log::warn!( 254 "Failed to mount blob {}: {} - falling back to upload", 255 clean_digest, 256 e 257 ); 258 // Fall through to regular upload session creation 259 } 260 } 261 } else { 262 log::warn!("User lacks permission to mount from {}", from_repo); 263 // Fall through to regular upload 264 } 265 } 266 } 267 268 // If digest is provided, handle monolithic upload (end-4b) 269 if let Some(digest_string) = params.digest { 270 let success = write_blob(&org, &repo, &digest_string, Body::from(body)).await; 271 272 if !success { 273 return response::digest_invalid(&digest_string); 274 } 275 276 metrics::BLOB_UPLOADS_TOTAL.inc(); 277 278 let clean_digest = digest_string 279 .strip_prefix("sha256:") 280 .unwrap_or(&digest_string); 281 282 return Response::builder() 283 .status(StatusCode::CREATED) 284 .header( 285 "Location", 286 format!( 287 "http://{}/v2/{}/{}/blobs/sha256:{}", 288 host, org, repo, clean_digest 289 ), 290 ) 291 .header("Docker-Content-Digest", format!("sha256:{}", clean_digest)) 292 .body(Body::empty()) 293 .unwrap(); 294 } 295 296 // Create new upload session (end-4a) 297 let uuid = uuid::Uuid::new_v4().to_string(); 298 299 if let Err(e) = storage::init_upload_session(&org, &repo, &uuid) { 300 log::error!("Failed to init upload session: {}", e); 301 return response::internal_error(); 302 } 303 304 let location = format!("http://{}/v2/{}/{}/blobs/uploads/{}", host, org, repo, uuid); 305 306 Response::builder() 307 .status(StatusCode::ACCEPTED) 308 .header("Location", location) 309 .header("Range", "0-0") 310 .header("Docker-Upload-UUID", uuid) 311 .body(Body::empty()) 312 .unwrap() 313} 314 315// end-5 PATCH /v2/:name/blobs/uploads/:reference 316pub(crate) async fn patch_blob_upload( 317 State(state): State<Arc<state::App>>, 318 Path((org, repo, uuid)): Path<(String, String, String)>, 319 headers: HeaderMap, 320 body: Bytes, 321) -> Response<Body> { 322 log::info!( 323 "blobs/patch_blob_upload: org: {}, repo: {}, uuid: {}", 324 org, 325 repo, 326 uuid 327 ); 328 329 let host = &state.args.host; 330 let repository = format!("{}/{}", org, repo); 331 332 // Check permission (Push for blob upload) 333 match auth::check_permission( 334 &state, 335 &headers, 336 &repository, 337 None, 338 permissions::Action::Push, 339 ) 340 .await 341 { 342 Ok(_) => {} 343 Err(_) => { 344 return if auth::authenticate_user(&state, &headers).await.is_ok() { 345 response::forbidden() 346 } else { 347 response::unauthorized(host) 348 }; 349 } 350 } 351 352 match storage::append_upload_chunk(&org, &repo, &uuid, &body) { 353 Ok(total_size) => { 354 let location = format!("http://{}/v2/{}/{}/blobs/uploads/{}", host, org, repo, uuid); 355 356 Response::builder() 357 .status(StatusCode::ACCEPTED) 358 .header("Location", location) 359 .header("Range", format!("0-{}", total_size.saturating_sub(1))) 360 .header("Docker-Upload-UUID", &uuid) 361 .body(Body::empty()) 362 .unwrap() 363 } 364 Err(e) => { 365 log::error!("Failed to append chunk for upload {}: {}", uuid, e); 366 response::blob_upload_unknown(&uuid) 367 } 368 } 369} 370 371// end-6 PUT /v2/:name/blobs/uploads/:reference?digest=:digest 372#[derive(Deserialize)] 373pub(crate) struct End6QueryParams { 374 digest: String, 375} 376 377pub(crate) async fn put_blob_upload_by_reference( 378 State(state): State<Arc<state::App>>, 379 Path((org, repo, uuid)): Path<(String, String, String)>, 380 Query(params): Query<End6QueryParams>, 381 headers: HeaderMap, 382 body: Bytes, 383) -> Response<Body> { 384 log::info!( 385 "blobs/put_blob_upload_by_reference: org: {}, repo: {}, uuid: {}, digest: {}", 386 org, 387 repo, 388 uuid, 389 params.digest 390 ); 391 392 let host = &state.args.host; 393 let repository = format!("{}/{}", org, repo); 394 395 // Check permission (Push for blob upload) 396 match auth::check_permission( 397 &state, 398 &headers, 399 &repository, 400 None, 401 permissions::Action::Push, 402 ) 403 .await 404 { 405 Ok(_) => {} 406 Err(_) => { 407 return if auth::authenticate_user(&state, &headers).await.is_ok() { 408 response::forbidden() 409 } else { 410 response::unauthorized(host) 411 }; 412 } 413 } 414 415 // Append final chunk if body is not empty 416 if !body.is_empty() { 417 if let Err(e) = storage::append_upload_chunk(&org, &repo, &uuid, &body) { 418 log::error!("Failed to append final chunk: {}", e); 419 return response::internal_error(); 420 } 421 } 422 423 // Finalize upload and validate digest 424 match storage::finalize_upload(&org, &repo, &uuid, &params.digest) { 425 Ok(actual_digest) => { 426 metrics::BLOB_UPLOADS_TOTAL.inc(); 427 428 let location = format!( 429 "http://{}/v2/{}/{}/blobs/sha256:{}", 430 host, org, repo, actual_digest 431 ); 432 433 Response::builder() 434 .status(StatusCode::CREATED) 435 .header("Location", location) 436 .header("Docker-Content-Digest", format!("sha256:{}", actual_digest)) 437 .body(Body::empty()) 438 .unwrap() 439 } 440 Err(e) => { 441 log::error!("Failed to finalize upload: {}", e); 442 443 // Clean up failed upload 444 let _ = storage::delete_upload_session(&org, &repo, &uuid); 445 446 if e.contains("Digest mismatch") { 447 response::digest_invalid(&params.digest) 448 } else { 449 response::internal_error() 450 } 451 } 452 } 453} 454 455// end-10 DELETE /v2/:name/blobs/:digest 456pub(crate) async fn delete_blob_by_digest( 457 State(state): State<Arc<state::App>>, 458 Path((org, repo, digest_string)): Path<(String, String, String)>, 459 headers: HeaderMap, 460) -> Response<Body> { 461 let host = &state.args.host; 462 let repository = format!("{}/{}", org, repo); 463 464 // Check permission (Delete for blob deletion) 465 match auth::check_permission( 466 &state, 467 &headers, 468 &repository, 469 None, 470 permissions::Action::Delete, 471 ) 472 .await 473 { 474 Ok(_) => {} 475 Err(_) => { 476 return if auth::authenticate_user(&state, &headers).await.is_ok() { 477 response::forbidden() 478 } else { 479 response::unauthorized(host) 480 }; 481 } 482 } 483 484 // Clean digest (strip sha256: prefix if present) 485 let clean_digest = digest_string 486 .strip_prefix("sha256:") 487 .unwrap_or(&digest_string); 488 489 log::info!( 490 "blobs/delete_blob_by_digest: org: {}, repo: {}, digest: {}", 491 org, 492 repo, 493 clean_digest 494 ); 495 496 // Delete blob 497 match storage::delete_blob(&org, &repo, clean_digest) { 498 Ok(()) => { 499 log::info!("Deleted blob {}/{}/{}", org, repo, clean_digest); 500 501 Response::builder() 502 .status(StatusCode::ACCEPTED) 503 .body(Body::empty()) 504 .unwrap() 505 } 506 Err(e) => { 507 if e.kind() == std::io::ErrorKind::NotFound { 508 log::warn!( 509 "Attempted to delete non-existent blob {}/{}/{}", 510 org, 511 repo, 512 clean_digest 513 ); 514 response::blob_unknown(&format!("sha256:{}", clean_digest)) 515 } else { 516 log::error!( 517 "Failed to delete blob {}/{}/{}: {}", 518 org, 519 repo, 520 clean_digest, 521 e 522 ); 523 response::internal_error() 524 } 525 } 526 } 527}