Rust implementation of OCI Distribution Spec with granular access control
at main 313 lines 10 kB view raw
1use serde::{Deserialize, Serialize}; 2use std::collections::{HashMap, HashSet}; 3use std::path::Path; 4use std::time::{SystemTime, UNIX_EPOCH}; 5 6type BlobLocation = (String, String, u64); // (org, repo, size) 7type UnreferencedBlob = (String, String, String, u64); // (org, repo, digest, size) 8 9#[derive(Debug, Serialize, Deserialize)] 10pub struct GcStats { 11 pub blobs_scanned: usize, 12 pub manifests_scanned: usize, 13 pub blobs_referenced: usize, 14 pub blobs_unreferenced: usize, 15 pub blobs_deleted: usize, 16 pub bytes_freed: u64, 17 pub duration_seconds: u64, 18} 19 20/// Run garbage collection with optional dry-run mode 21pub fn run_gc( 22 dry_run: bool, 23 grace_period_hours: u64, 24) -> Result<GcStats, Box<dyn std::error::Error>> { 25 let start_time = SystemTime::now(); 26 27 let mut stats = GcStats { 28 blobs_scanned: 0, 29 manifests_scanned: 0, 30 blobs_referenced: 0, 31 blobs_unreferenced: 0, 32 blobs_deleted: 0, 33 bytes_freed: 0, 34 duration_seconds: 0, 35 }; 36 37 log::info!("Starting garbage collection (dry_run: {})", dry_run); 38 39 // Step 1: Scan all manifests and build referenced blob set 40 let referenced_blobs = scan_manifests(&mut stats)?; 41 stats.blobs_referenced = referenced_blobs.len(); 42 43 log::info!( 44 "Found {} referenced blobs from {} manifests", 45 stats.blobs_referenced, 46 stats.manifests_scanned 47 ); 48 49 // Step 2: Scan all blobs and identify unreferenced ones 50 let all_blobs = scan_all_blobs(&mut stats)?; 51 52 log::info!("Scanned {} total blobs", stats.blobs_scanned); 53 54 // Step 3: Mark unreferenced blobs 55 let unreferenced_blobs = mark_unreferenced_blobs(&all_blobs, &referenced_blobs)?; 56 stats.blobs_unreferenced = unreferenced_blobs.len(); 57 58 log::info!("Identified {} unreferenced blobs", stats.blobs_unreferenced); 59 60 // Step 4: Sweep marked blobs that are past grace period 61 if !dry_run { 62 sweep_marked_blobs(&unreferenced_blobs, grace_period_hours, &mut stats)?; 63 log::info!( 64 "Deleted {} blobs, freed {} bytes", 65 stats.blobs_deleted, 66 stats.bytes_freed 67 ); 68 } else { 69 log::info!("DRY RUN: Would delete {} blobs", unreferenced_blobs.len()); 70 } 71 72 stats.duration_seconds = start_time.elapsed()?.as_secs(); 73 74 Ok(stats) 75} 76 77/// Scan all manifests and extract referenced blob digests 78fn scan_manifests(stats: &mut GcStats) -> Result<HashSet<String>, Box<dyn std::error::Error>> { 79 let mut referenced = HashSet::new(); 80 let manifests_dir = Path::new("./tmp/manifests"); 81 82 if !manifests_dir.exists() { 83 return Ok(referenced); 84 } 85 86 // Walk through org/repo/manifest structure 87 for org_entry in std::fs::read_dir(manifests_dir)? { 88 let org_entry = org_entry?; 89 if !org_entry.path().is_dir() { 90 continue; 91 } 92 93 for repo_entry in std::fs::read_dir(org_entry.path())? { 94 let repo_entry = repo_entry?; 95 if !repo_entry.path().is_dir() { 96 continue; 97 } 98 99 for manifest_entry in std::fs::read_dir(repo_entry.path())? { 100 let manifest_entry = manifest_entry?; 101 if !manifest_entry.path().is_file() { 102 continue; 103 } 104 105 stats.manifests_scanned += 1; 106 107 // Read and parse manifest 108 if let Ok(manifest_data) = std::fs::read(manifest_entry.path()) { 109 if let Ok(manifest_str) = std::str::from_utf8(&manifest_data) { 110 extract_blob_references(manifest_str, &mut referenced); 111 } 112 } 113 } 114 } 115 } 116 117 Ok(referenced) 118} 119 120/// Extract blob digest references from manifest JSON 121fn extract_blob_references(manifest_json: &str, referenced: &mut HashSet<String>) { 122 if let Ok(manifest) = serde_json::from_str::<serde_json::Value>(manifest_json) { 123 // Extract config digest 124 if let Some(config) = manifest.get("config") { 125 if let Some(digest) = config.get("digest").and_then(|d| d.as_str()) { 126 let clean_digest = digest.strip_prefix("sha256:").unwrap_or(digest); 127 referenced.insert(clean_digest.to_string()); 128 } 129 } 130 131 // Extract layer digests 132 if let Some(layers) = manifest.get("layers").and_then(|l| l.as_array()) { 133 for layer in layers { 134 if let Some(digest) = layer.get("digest").and_then(|d| d.as_str()) { 135 let clean_digest = digest.strip_prefix("sha256:").unwrap_or(digest); 136 referenced.insert(clean_digest.to_string()); 137 } 138 } 139 } 140 141 // Extract manifests from image index 142 if let Some(manifests) = manifest.get("manifests").and_then(|m| m.as_array()) { 143 for manifest_desc in manifests { 144 if let Some(digest) = manifest_desc.get("digest").and_then(|d| d.as_str()) { 145 let clean_digest = digest.strip_prefix("sha256:").unwrap_or(digest); 146 referenced.insert(clean_digest.to_string()); 147 } 148 } 149 } 150 } 151} 152 153/// Scan all blobs in storage 154fn scan_all_blobs( 155 stats: &mut GcStats, 156) -> Result<HashMap<String, Vec<BlobLocation>>, Box<dyn std::error::Error>> { 157 let mut all_blobs: HashMap<String, Vec<BlobLocation>> = HashMap::new(); 158 let blobs_dir = Path::new("./tmp/blobs"); 159 160 if !blobs_dir.exists() { 161 return Ok(all_blobs); 162 } 163 164 for org_entry in std::fs::read_dir(blobs_dir)? { 165 let org_entry = org_entry?; 166 if !org_entry.path().is_dir() { 167 continue; 168 } 169 170 let org = org_entry.file_name().to_string_lossy().to_string(); 171 172 for repo_entry in std::fs::read_dir(org_entry.path())? { 173 let repo_entry = repo_entry?; 174 if !repo_entry.path().is_dir() { 175 continue; 176 } 177 178 let repo = repo_entry.file_name().to_string_lossy().to_string(); 179 180 for blob_entry in std::fs::read_dir(repo_entry.path())? { 181 let blob_entry = blob_entry?; 182 if !blob_entry.path().is_file() { 183 continue; 184 } 185 186 stats.blobs_scanned += 1; 187 188 let digest = blob_entry.file_name().to_string_lossy().to_string(); 189 let size = blob_entry.metadata()?.len(); 190 191 // Track all locations for this digest 192 all_blobs 193 .entry(digest) 194 .or_default() 195 .push((org.clone(), repo.clone(), size)); 196 } 197 } 198 } 199 200 Ok(all_blobs) 201} 202 203/// Mark unreferenced blobs for deletion 204fn mark_unreferenced_blobs( 205 all_blobs: &HashMap<String, Vec<BlobLocation>>, 206 referenced_blobs: &HashSet<String>, 207) -> Result<Vec<UnreferencedBlob>, Box<dyn std::error::Error>> { 208 let mut unreferenced = Vec::new(); 209 210 for (digest, locations) in all_blobs { 211 if !referenced_blobs.contains(digest) { 212 // Add all locations of this unreferenced blob 213 for (org, repo, size) in locations { 214 unreferenced.push((org.clone(), repo.clone(), digest.clone(), *size)); 215 } 216 } 217 } 218 219 Ok(unreferenced) 220} 221 222/// Sweep (delete) marked blobs that are past grace period 223fn sweep_marked_blobs( 224 unreferenced_blobs: &[UnreferencedBlob], 225 grace_period_hours: u64, 226 stats: &mut GcStats, 227) -> Result<(), Box<dyn std::error::Error>> { 228 let now = SystemTime::now().duration_since(UNIX_EPOCH)?.as_secs(); 229 let grace_period_secs = grace_period_hours * 3600; 230 231 for (org, repo, digest, size) in unreferenced_blobs { 232 // Check blob modification time 233 let blob_path = format!("./tmp/blobs/{}/{}/{}", org, repo, digest); 234 235 if let Ok(metadata) = std::fs::metadata(&blob_path) { 236 if let Ok(modified) = metadata.modified() { 237 let modified_secs = modified.duration_since(UNIX_EPOCH)?.as_secs(); 238 let age_secs = now.saturating_sub(modified_secs); 239 240 // Only delete if past grace period 241 if age_secs >= grace_period_secs { 242 match std::fs::remove_file(&blob_path) { 243 Ok(()) => { 244 log::info!( 245 "Deleted unreferenced blob: {}/{}/{} ({} bytes)", 246 org, 247 repo, 248 digest, 249 size 250 ); 251 stats.blobs_deleted += 1; 252 stats.bytes_freed += size; 253 } 254 Err(e) => { 255 log::warn!("Failed to delete blob {}: {}", blob_path, e); 256 } 257 } 258 } else { 259 log::debug!( 260 "Blob {} still in grace period ({} hours old)", 261 digest, 262 age_secs / 3600 263 ); 264 } 265 } 266 } 267 } 268 269 Ok(()) 270} 271 272#[cfg(test)] 273mod tests { 274 use super::*; 275 276 #[test] 277 fn test_extract_blob_references() { 278 let manifest = r#"{ 279 "config": { 280 "digest": "sha256:abc123" 281 }, 282 "layers": [ 283 {"digest": "sha256:layer1"}, 284 {"digest": "sha256:layer2"} 285 ] 286 }"#; 287 288 let mut referenced = HashSet::new(); 289 extract_blob_references(manifest, &mut referenced); 290 291 assert_eq!(referenced.len(), 3); 292 assert!(referenced.contains("abc123")); 293 assert!(referenced.contains("layer1")); 294 assert!(referenced.contains("layer2")); 295 } 296 297 #[test] 298 fn test_extract_image_index_references() { 299 let manifest = r#"{ 300 "manifests": [ 301 {"digest": "sha256:manifest1"}, 302 {"digest": "sha256:manifest2"} 303 ] 304 }"#; 305 306 let mut referenced = HashSet::new(); 307 extract_blob_references(manifest, &mut referenced); 308 309 assert_eq!(referenced.len(), 2); 310 assert!(referenced.contains("manifest1")); 311 assert!(referenced.contains("manifest2")); 312 } 313}