Rust implementation of OCI Distribution Spec with granular access control
1// | ID | Method | API Endpoint | Success | Failure |
2// | ------ | -------------- | ------------------------------------------------------------ | ----------- | ----------------- |
3// | end-1 | `GET` | `/v2/` | `200` | `404`/`401` |
4// | end-2 | `GET` / `HEAD` | `/v2/<name>/blobs/<digest>` | `200` | `404` |
5// | end-4a | `POST` | `/v2/<name>/blobs/uploads/` | `202` | `404` |
6// | end-4b | `POST` | `/v2/<name>/blobs/uploads/?digest=<digest>` | `201`/`202` | `404`/`400` |
7// | end-5 | `PATCH` | `/v2/<name>/blobs/uploads/<reference>` | `202` | `404`/`416` |
8// | end-6 | `PUT` | `/v2/<name>/blobs/uploads/<reference>?digest=<digest>` | `201` | `404`/`400` |
9// | end-10 | `DELETE` | `/v2/<name>/blobs/<digest>` | `202` | `404`/`405` |
10// | end-11 | `POST` | `/v2/<name>/blobs/uploads/?mount=<digest>&from=<other_name>` | `201` | `404` |
11
12use serde::Deserialize;
13use std::sync::Arc;
14
15use crate::{
16 auth, metrics, permissions, response, state,
17 storage::{self, write_blob},
18};
19use axum::{
20 body::Body,
21 extract::{Path, Query, State},
22 http::{HeaderMap, StatusCode},
23 response::Response,
24};
25use bytes::Bytes;
26
27// end-2 GET /v2/:name/blobs/:digest
28pub(crate) async fn get_blob_by_digest(
29 State(state): State<Arc<state::App>>,
30 Path((org, repo, digest_string)): Path<(String, String, String)>,
31 headers: HeaderMap,
32) -> Response<Body> {
33 log::info!(
34 "blobs/get_blob_by_digest: org: {}, repo {}, digest: {}",
35 org,
36 repo,
37 digest_string
38 );
39
40 let host = &state.args.host;
41 let repository = format!("{}/{}", org, repo);
42
43 // Check permission (Pull for blob retrieval)
44 match auth::check_permission(
45 &state,
46 &headers,
47 &repository,
48 None,
49 permissions::Action::Pull,
50 )
51 .await
52 {
53 Ok(_) => {}
54 Err(_) => {
55 return if auth::authenticate_user(&state, &headers).await.is_ok() {
56 response::forbidden()
57 } else {
58 response::unauthorized(host)
59 };
60 }
61 }
62
63 // Strip sha256: prefix if present
64 let clean_digest = digest_string
65 .strip_prefix("sha256:")
66 .unwrap_or(&digest_string);
67
68 // Read blob from storage
69 match storage::read_blob(&org, &repo, clean_digest) {
70 Ok(blob_data) => {
71 metrics::BLOB_DOWNLOADS_TOTAL.inc();
72 Response::builder()
73 .status(StatusCode::OK)
74 .header("Content-Length", blob_data.len().to_string())
75 .header("Docker-Content-Digest", format!("sha256:{}", clean_digest))
76 .header("Content-Type", "application/octet-stream")
77 .body(Body::from(blob_data))
78 .unwrap()
79 }
80 Err(e) => {
81 log::warn!(
82 "blobs/get_blob_by_digest: blob not found: {}/{}/{}: {}",
83 org,
84 repo,
85 clean_digest,
86 e
87 );
88 response::blob_unknown(&format!("sha256:{}", clean_digest))
89 }
90 }
91}
92
93// end-2 HEAD /v2/:name/blobs/:digest
94pub(crate) async fn head_blob_by_digest(
95 State(state): State<Arc<state::App>>,
96 Path((org, repo, digest_string)): Path<(String, String, String)>,
97 headers: HeaderMap,
98) -> Response<Body> {
99 log::info!(
100 "blobs/head_blob_by_digest: org: {}, repo {}, digest: {}",
101 org,
102 repo,
103 digest_string
104 );
105
106 let host = &state.args.host;
107 let repository = format!("{}/{}", org, repo);
108
109 // Check permission (Pull for blob retrieval)
110 match auth::check_permission(
111 &state,
112 &headers,
113 &repository,
114 None,
115 permissions::Action::Pull,
116 )
117 .await
118 {
119 Ok(_) => {}
120 Err(_) => {
121 return if auth::authenticate_user(&state, &headers).await.is_ok() {
122 Response::builder()
123 .status(StatusCode::FORBIDDEN)
124 .body(Body::empty())
125 .unwrap()
126 } else {
127 Response::builder()
128 .status(StatusCode::UNAUTHORIZED)
129 .header(
130 "WWW-Authenticate",
131 format!("Basic realm=\"{}\", charset=\"UTF-8\"", host),
132 )
133 .body(Body::empty())
134 .unwrap()
135 };
136 }
137 }
138
139 // Strip sha256: prefix if present
140 let clean_digest = digest_string
141 .strip_prefix("sha256:")
142 .unwrap_or(&digest_string);
143
144 // Check if blob exists and get metadata
145 match storage::blob_metadata(&org, &repo, clean_digest) {
146 Ok(metadata) => Response::builder()
147 .status(StatusCode::OK)
148 .header("Content-Length", metadata.len().to_string())
149 .header("Docker-Content-Digest", format!("sha256:{}", clean_digest))
150 .header("Content-Type", "application/octet-stream")
151 .body(Body::empty())
152 .unwrap(),
153 Err(e) => {
154 log::warn!(
155 "blobs/head_blob_by_digest: blob not found: {}/{}/{}: {}",
156 org,
157 repo,
158 clean_digest,
159 e
160 );
161 response::blob_unknown(&format!("sha256:{}", clean_digest))
162 }
163 }
164}
165
166// end-4a POST /v2/:name/blobs/uploads/
167// end-4b POST /v2/:name/blobs/uploads/?digest=:digest
168// end-11 POST /v2/:name/blobs/uploads/?mount=:digest&from=:other_name
169#[derive(Deserialize)]
170pub(crate) struct PostBlobUploadQueryParams {
171 digest: Option<String>,
172 mount: Option<String>,
173 from: Option<String>,
174}
175
176pub(crate) async fn post_blob_upload(
177 State(state): State<Arc<state::App>>,
178 Path((org, repo)): Path<(String, String)>,
179 Query(params): Query<PostBlobUploadQueryParams>,
180 headers: HeaderMap,
181 body: Bytes,
182) -> Response<Body> {
183 log::info!("blobs/post_blob_upload: org: {}, repo: {}", org, repo);
184
185 let host = &state.args.host;
186 let repository = format!("{}/{}", org, repo);
187
188 // Check permission (Push for blob upload)
189 match auth::check_permission(
190 &state,
191 &headers,
192 &repository,
193 None,
194 permissions::Action::Push,
195 )
196 .await
197 {
198 Ok(_) => {}
199 Err(_) => {
200 return if auth::authenticate_user(&state, &headers).await.is_ok() {
201 response::forbidden()
202 } else {
203 response::unauthorized(host)
204 };
205 }
206 }
207
208 // Handle blob mounting (end-11)
209 if let (Some(mount_digest), Some(from_repo)) = (¶ms.mount, ¶ms.from) {
210 let clean_digest = mount_digest.strip_prefix("sha256:").unwrap_or(mount_digest);
211
212 // Parse source repository (format: "org/repo")
213 let from_parts: Vec<&str> = from_repo.split('/').collect();
214 if from_parts.len() == 2 {
215 let source_org = from_parts[0];
216 let source_repo = from_parts[1];
217 let source_repository = format!("{}/{}", source_org, source_repo);
218
219 // Check if user has pull permission on source repository
220 if auth::check_permission(
221 &state,
222 &headers,
223 &source_repository,
224 None,
225 permissions::Action::Pull,
226 )
227 .await
228 .is_ok()
229 {
230 // Attempt to mount blob
231 match storage::mount_blob(source_org, source_repo, &org, &repo, clean_digest) {
232 Ok(()) => {
233 log::info!(
234 "Mounted blob {} from {} to {}",
235 clean_digest,
236 from_repo,
237 repository
238 );
239
240 let location = format!(
241 "http://{}/v2/{}/{}/blobs/sha256:{}",
242 host, org, repo, clean_digest
243 );
244
245 return Response::builder()
246 .status(StatusCode::CREATED)
247 .header("Location", location)
248 .header("Docker-Content-Digest", format!("sha256:{}", clean_digest))
249 .body(Body::empty())
250 .unwrap();
251 }
252 Err(e) => {
253 log::warn!(
254 "Failed to mount blob {}: {} - falling back to upload",
255 clean_digest,
256 e
257 );
258 // Fall through to regular upload session creation
259 }
260 }
261 } else {
262 log::warn!("User lacks permission to mount from {}", from_repo);
263 // Fall through to regular upload
264 }
265 }
266 }
267
268 // If digest is provided, handle monolithic upload (end-4b)
269 if let Some(digest_string) = params.digest {
270 let success = write_blob(&org, &repo, &digest_string, Body::from(body)).await;
271
272 if !success {
273 return response::digest_invalid(&digest_string);
274 }
275
276 metrics::BLOB_UPLOADS_TOTAL.inc();
277
278 let clean_digest = digest_string
279 .strip_prefix("sha256:")
280 .unwrap_or(&digest_string);
281
282 return Response::builder()
283 .status(StatusCode::CREATED)
284 .header(
285 "Location",
286 format!(
287 "http://{}/v2/{}/{}/blobs/sha256:{}",
288 host, org, repo, clean_digest
289 ),
290 )
291 .header("Docker-Content-Digest", format!("sha256:{}", clean_digest))
292 .body(Body::empty())
293 .unwrap();
294 }
295
296 // Create new upload session (end-4a)
297 let uuid = uuid::Uuid::new_v4().to_string();
298
299 if let Err(e) = storage::init_upload_session(&org, &repo, &uuid) {
300 log::error!("Failed to init upload session: {}", e);
301 return response::internal_error();
302 }
303
304 let location = format!("http://{}/v2/{}/{}/blobs/uploads/{}", host, org, repo, uuid);
305
306 Response::builder()
307 .status(StatusCode::ACCEPTED)
308 .header("Location", location)
309 .header("Range", "0-0")
310 .header("Docker-Upload-UUID", uuid)
311 .body(Body::empty())
312 .unwrap()
313}
314
315// end-5 PATCH /v2/:name/blobs/uploads/:reference
316pub(crate) async fn patch_blob_upload(
317 State(state): State<Arc<state::App>>,
318 Path((org, repo, uuid)): Path<(String, String, String)>,
319 headers: HeaderMap,
320 body: Bytes,
321) -> Response<Body> {
322 log::info!(
323 "blobs/patch_blob_upload: org: {}, repo: {}, uuid: {}",
324 org,
325 repo,
326 uuid
327 );
328
329 let host = &state.args.host;
330 let repository = format!("{}/{}", org, repo);
331
332 // Check permission (Push for blob upload)
333 match auth::check_permission(
334 &state,
335 &headers,
336 &repository,
337 None,
338 permissions::Action::Push,
339 )
340 .await
341 {
342 Ok(_) => {}
343 Err(_) => {
344 return if auth::authenticate_user(&state, &headers).await.is_ok() {
345 response::forbidden()
346 } else {
347 response::unauthorized(host)
348 };
349 }
350 }
351
352 match storage::append_upload_chunk(&org, &repo, &uuid, &body) {
353 Ok(total_size) => {
354 let location = format!("http://{}/v2/{}/{}/blobs/uploads/{}", host, org, repo, uuid);
355
356 Response::builder()
357 .status(StatusCode::ACCEPTED)
358 .header("Location", location)
359 .header("Range", format!("0-{}", total_size.saturating_sub(1)))
360 .header("Docker-Upload-UUID", &uuid)
361 .body(Body::empty())
362 .unwrap()
363 }
364 Err(e) => {
365 log::error!("Failed to append chunk for upload {}: {}", uuid, e);
366 response::blob_upload_unknown(&uuid)
367 }
368 }
369}
370
371// end-6 PUT /v2/:name/blobs/uploads/:reference?digest=:digest
372#[derive(Deserialize)]
373pub(crate) struct End6QueryParams {
374 digest: String,
375}
376
377pub(crate) async fn put_blob_upload_by_reference(
378 State(state): State<Arc<state::App>>,
379 Path((org, repo, uuid)): Path<(String, String, String)>,
380 Query(params): Query<End6QueryParams>,
381 headers: HeaderMap,
382 body: Bytes,
383) -> Response<Body> {
384 log::info!(
385 "blobs/put_blob_upload_by_reference: org: {}, repo: {}, uuid: {}, digest: {}",
386 org,
387 repo,
388 uuid,
389 params.digest
390 );
391
392 let host = &state.args.host;
393 let repository = format!("{}/{}", org, repo);
394
395 // Check permission (Push for blob upload)
396 match auth::check_permission(
397 &state,
398 &headers,
399 &repository,
400 None,
401 permissions::Action::Push,
402 )
403 .await
404 {
405 Ok(_) => {}
406 Err(_) => {
407 return if auth::authenticate_user(&state, &headers).await.is_ok() {
408 response::forbidden()
409 } else {
410 response::unauthorized(host)
411 };
412 }
413 }
414
415 // Append final chunk if body is not empty
416 if !body.is_empty() {
417 if let Err(e) = storage::append_upload_chunk(&org, &repo, &uuid, &body) {
418 log::error!("Failed to append final chunk: {}", e);
419 return response::internal_error();
420 }
421 }
422
423 // Finalize upload and validate digest
424 match storage::finalize_upload(&org, &repo, &uuid, ¶ms.digest) {
425 Ok(actual_digest) => {
426 metrics::BLOB_UPLOADS_TOTAL.inc();
427
428 let location = format!(
429 "http://{}/v2/{}/{}/blobs/sha256:{}",
430 host, org, repo, actual_digest
431 );
432
433 Response::builder()
434 .status(StatusCode::CREATED)
435 .header("Location", location)
436 .header("Docker-Content-Digest", format!("sha256:{}", actual_digest))
437 .body(Body::empty())
438 .unwrap()
439 }
440 Err(e) => {
441 log::error!("Failed to finalize upload: {}", e);
442
443 // Clean up failed upload
444 let _ = storage::delete_upload_session(&org, &repo, &uuid);
445
446 if e.contains("Digest mismatch") {
447 response::digest_invalid(¶ms.digest)
448 } else {
449 response::internal_error()
450 }
451 }
452 }
453}
454
455// end-10 DELETE /v2/:name/blobs/:digest
456pub(crate) async fn delete_blob_by_digest(
457 State(state): State<Arc<state::App>>,
458 Path((org, repo, digest_string)): Path<(String, String, String)>,
459 headers: HeaderMap,
460) -> Response<Body> {
461 let host = &state.args.host;
462 let repository = format!("{}/{}", org, repo);
463
464 // Check permission (Delete for blob deletion)
465 match auth::check_permission(
466 &state,
467 &headers,
468 &repository,
469 None,
470 permissions::Action::Delete,
471 )
472 .await
473 {
474 Ok(_) => {}
475 Err(_) => {
476 return if auth::authenticate_user(&state, &headers).await.is_ok() {
477 response::forbidden()
478 } else {
479 response::unauthorized(host)
480 };
481 }
482 }
483
484 // Clean digest (strip sha256: prefix if present)
485 let clean_digest = digest_string
486 .strip_prefix("sha256:")
487 .unwrap_or(&digest_string);
488
489 log::info!(
490 "blobs/delete_blob_by_digest: org: {}, repo: {}, digest: {}",
491 org,
492 repo,
493 clean_digest
494 );
495
496 // Delete blob
497 match storage::delete_blob(&org, &repo, clean_digest) {
498 Ok(()) => {
499 log::info!("Deleted blob {}/{}/{}", org, repo, clean_digest);
500
501 Response::builder()
502 .status(StatusCode::ACCEPTED)
503 .body(Body::empty())
504 .unwrap()
505 }
506 Err(e) => {
507 if e.kind() == std::io::ErrorKind::NotFound {
508 log::warn!(
509 "Attempted to delete non-existent blob {}/{}/{}",
510 org,
511 repo,
512 clean_digest
513 );
514 response::blob_unknown(&format!("sha256:{}", clean_digest))
515 } else {
516 log::error!(
517 "Failed to delete blob {}/{}/{}: {}",
518 org,
519 repo,
520 clean_digest,
521 e
522 );
523 response::internal_error()
524 }
525 }
526 }
527}