Monorepo for wisp.place. A static site hosting service built on top of the AT Protocol. wisp.place
96
fork

Configure Feed

Select the types of activity you want to include in your feed.

at 7ae1422d8134c6d44be3e0ff9e95d36f647b7398 497 lines 18 kB view raw
1use jacquard_common::types::string::AtUri; 2use jacquard_common::types::blob::BlobRef; 3use jacquard_common::IntoStatic; 4use jacquard::client::{Agent, AgentSession, AgentSessionExt}; 5use jacquard::prelude::IdentityResolver; 6use miette::IntoDiagnostic; 7use std::collections::HashMap; 8 9use crate::place_wisp::fs::{Directory as FsDirectory, EntryNode as FsEntryNode}; 10use crate::place_wisp::subfs::SubfsRecord; 11 12/// Extract all subfs URIs from a directory tree with their mount paths 13pub fn extract_subfs_uris(directory: &FsDirectory, current_path: String) -> Vec<(String, String)> { 14 let mut uris = Vec::new(); 15 16 for entry in &directory.entries { 17 let full_path = if current_path.is_empty() { 18 entry.name.to_string() 19 } else { 20 format!("{}/{}", current_path, entry.name) 21 }; 22 23 match &entry.node { 24 FsEntryNode::Subfs(subfs_node) => { 25 // Found a subfs node - store its URI and mount path 26 uris.push((subfs_node.subject.to_string(), full_path.clone())); 27 } 28 FsEntryNode::Directory(subdir) => { 29 // Recursively search subdirectories 30 let sub_uris = extract_subfs_uris(subdir, full_path); 31 uris.extend(sub_uris); 32 } 33 FsEntryNode::File(_) => { 34 // Files don't contain subfs 35 } 36 FsEntryNode::Unknown(_) => { 37 // Skip unknown nodes 38 } 39 } 40 } 41 42 uris 43} 44 45/// Fetch a subfs record from the PDS 46pub async fn fetch_subfs_record( 47 agent: &Agent<impl AgentSession + IdentityResolver>, 48 uri: &str, 49) -> miette::Result<SubfsRecord<'static>> { 50 // Parse URI: at://did/collection/rkey 51 let parts: Vec<&str> = uri.trim_start_matches("at://").split('/').collect(); 52 53 if parts.len() < 3 { 54 return Err(miette::miette!("Invalid subfs URI: {}", uri)); 55 } 56 57 let _did = parts[0]; 58 let collection = parts[1]; 59 let _rkey = parts[2]; 60 61 if collection != "place.wisp.subfs" { 62 return Err(miette::miette!("Expected place.wisp.subfs collection, got: {}", collection)); 63 } 64 65 // Construct AT-URI for fetching 66 let at_uri = AtUri::new(uri).into_diagnostic()?; 67 68 // Fetch the record 69 let response = agent.get_record::<SubfsRecord>(&at_uri).await.into_diagnostic()?; 70 let record_output = response.into_output().into_diagnostic()?; 71 72 Ok(record_output.value.into_static()) 73} 74 75/// Recursively fetch all subfs records (including nested ones) 76/// Returns a list of (mount_path, SubfsRecord) tuples 77/// Note: Multiple records can have the same mount_path (for flat-merged chunks) 78pub async fn fetch_all_subfs_records_recursive( 79 agent: &Agent<impl AgentSession + IdentityResolver>, 80 initial_uris: Vec<(String, String)>, 81) -> miette::Result<Vec<(String, SubfsRecord<'static>)>> { 82 use futures::stream::{self, StreamExt}; 83 84 let mut all_subfs: Vec<(String, SubfsRecord<'static>)> = Vec::new(); 85 let mut fetched_uris: std::collections::HashSet<String> = std::collections::HashSet::new(); 86 let mut to_fetch = initial_uris; 87 88 if to_fetch.is_empty() { 89 return Ok(all_subfs); 90 } 91 92 println!("Found {} subfs records, fetching recursively...", to_fetch.len()); 93 94 let mut iteration = 0; 95 const MAX_ITERATIONS: usize = 10; 96 97 while !to_fetch.is_empty() && iteration < MAX_ITERATIONS { 98 iteration += 1; 99 println!(" Iteration {}: fetching {} subfs records...", iteration, to_fetch.len()); 100 101 let subfs_results: Vec<_> = stream::iter(to_fetch.clone()) 102 .map(|(uri, mount_path)| async move { 103 match fetch_subfs_record(agent, &uri).await { 104 Ok(record) => Some((mount_path, record, uri)), 105 Err(e) => { 106 eprintln!(" ⚠️ Failed to fetch subfs {}: {}", uri, e); 107 None 108 } 109 } 110 }) 111 .buffer_unordered(5) 112 .collect() 113 .await; 114 115 // Process results and find nested subfs 116 let mut newly_found_uris = Vec::new(); 117 for result in subfs_results { 118 if let Some((mount_path, record, uri)) = result { 119 println!(" ✓ Fetched subfs at {}", mount_path); 120 121 // Extract nested subfs URIs from this record 122 let nested_uris = extract_subfs_uris_from_subfs_dir(&record.root, mount_path.clone()); 123 newly_found_uris.extend(nested_uris); 124 125 all_subfs.push((mount_path, record)); 126 fetched_uris.insert(uri); 127 } 128 } 129 130 // Filter out already-fetched URIs (based on URI, not path) 131 to_fetch = newly_found_uris 132 .into_iter() 133 .filter(|(uri, _)| !fetched_uris.contains(uri)) 134 .collect(); 135 } 136 137 if iteration >= MAX_ITERATIONS { 138 eprintln!("⚠️ Max iterations reached while fetching nested subfs"); 139 } 140 141 println!(" Total subfs records fetched: {}", all_subfs.len()); 142 143 Ok(all_subfs) 144} 145 146/// Extract subfs URIs from a subfs::Directory 147fn extract_subfs_uris_from_subfs_dir( 148 directory: &crate::place_wisp::subfs::Directory, 149 current_path: String, 150) -> Vec<(String, String)> { 151 let mut uris = Vec::new(); 152 153 for entry in &directory.entries { 154 match &entry.node { 155 crate::place_wisp::subfs::EntryNode::Subfs(subfs_node) => { 156 // Check if this is a chunk entry (chunk0, chunk1, etc.) 157 // Chunks should be flat-merged, so use the parent's path 158 let mount_path = if entry.name.starts_with("chunk") && 159 entry.name.chars().skip(5).all(|c| c.is_ascii_digit()) { 160 // This is a chunk - use parent's path for flat merge 161 println!(" → Found chunk {} at {}, will flat-merge to {}", entry.name, current_path, current_path); 162 current_path.clone() 163 } else { 164 // Normal subfs - append name to path 165 if current_path.is_empty() { 166 entry.name.to_string() 167 } else { 168 format!("{}/{}", current_path, entry.name) 169 } 170 }; 171 172 uris.push((subfs_node.subject.to_string(), mount_path)); 173 } 174 crate::place_wisp::subfs::EntryNode::Directory(subdir) => { 175 let full_path = if current_path.is_empty() { 176 entry.name.to_string() 177 } else { 178 format!("{}/{}", current_path, entry.name) 179 }; 180 let nested = extract_subfs_uris_from_subfs_dir(subdir, full_path); 181 uris.extend(nested); 182 } 183 _ => {} 184 } 185 } 186 187 uris 188} 189 190/// Merge blob maps from subfs records into the main blob map (RECURSIVE) 191/// Returns the total number of blobs merged from all subfs records 192pub async fn merge_subfs_blob_maps( 193 agent: &Agent<impl AgentSession + IdentityResolver>, 194 subfs_uris: Vec<(String, String)>, 195 main_blob_map: &mut HashMap<String, (BlobRef<'static>, String)>, 196) -> miette::Result<usize> { 197 // Fetch all subfs records recursively 198 let all_subfs = fetch_all_subfs_records_recursive(agent, subfs_uris).await?; 199 200 let mut total_merged = 0; 201 202 // Extract blobs from all fetched subfs records 203 // Skip parent records that only contain chunk references (no actual files) 204 for (mount_path, subfs_record) in all_subfs { 205 // Check if this record only contains chunk subfs references (no files) 206 let only_has_chunks = subfs_record.root.entries.iter().all(|e| { 207 matches!(&e.node, crate::place_wisp::subfs::EntryNode::Subfs(_)) && 208 e.name.starts_with("chunk") && 209 e.name.chars().skip(5).all(|c| c.is_ascii_digit()) 210 }); 211 212 if only_has_chunks && !subfs_record.root.entries.is_empty() { 213 // This is a parent containing only chunks - skip it, blobs are in the chunks 214 println!(" → Skipping parent subfs at {} ({} chunks, no files)", mount_path, subfs_record.root.entries.len()); 215 continue; 216 } 217 218 let subfs_blob_map = extract_subfs_blobs(&subfs_record.root, mount_path.clone()); 219 let count = subfs_blob_map.len(); 220 221 for (path, blob_info) in subfs_blob_map { 222 main_blob_map.insert(path, blob_info); 223 } 224 225 total_merged += count; 226 println!(" ✓ Merged {} blobs from subfs at {}", count, mount_path); 227 } 228 229 Ok(total_merged) 230} 231 232/// Extract blobs from a subfs directory (works with subfs::Directory) 233/// Returns a map of file paths to their blob refs and CIDs 234fn extract_subfs_blobs( 235 directory: &crate::place_wisp::subfs::Directory, 236 current_path: String, 237) -> HashMap<String, (BlobRef<'static>, String)> { 238 let mut blob_map = HashMap::new(); 239 240 for entry in &directory.entries { 241 let full_path = if current_path.is_empty() { 242 entry.name.to_string() 243 } else { 244 format!("{}/{}", current_path, entry.name) 245 }; 246 247 match &entry.node { 248 crate::place_wisp::subfs::EntryNode::File(file_node) => { 249 let blob_ref = &file_node.blob; 250 let cid_string = blob_ref.blob().r#ref.to_string(); 251 blob_map.insert( 252 full_path, 253 (blob_ref.clone().into_static(), cid_string) 254 ); 255 } 256 crate::place_wisp::subfs::EntryNode::Directory(subdir) => { 257 let sub_map = extract_subfs_blobs(subdir, full_path); 258 blob_map.extend(sub_map); 259 } 260 crate::place_wisp::subfs::EntryNode::Subfs(_nested_subfs) => { 261 // Nested subfs - these should be resolved recursively in the main flow 262 // For now, we skip them (they'll be fetched separately) 263 eprintln!(" ⚠️ Found nested subfs at {}, skipping (should be fetched separately)", full_path); 264 } 265 crate::place_wisp::subfs::EntryNode::Unknown(_) => { 266 // Skip unknown nodes 267 } 268 } 269 } 270 271 blob_map 272} 273 274/// Count total files in a directory tree 275pub fn count_files_in_directory(directory: &FsDirectory) -> usize { 276 let mut count = 0; 277 278 for entry in &directory.entries { 279 match &entry.node { 280 FsEntryNode::File(_) => count += 1, 281 FsEntryNode::Directory(subdir) => { 282 count += count_files_in_directory(subdir); 283 } 284 FsEntryNode::Subfs(_) => { 285 // Subfs nodes don't count towards the main manifest file count 286 } 287 FsEntryNode::Unknown(_) => {} 288 } 289 } 290 291 count 292} 293 294/// Estimate JSON size of a directory tree 295pub fn estimate_directory_size(directory: &FsDirectory) -> usize { 296 // Serialize to JSON and measure 297 match serde_json::to_string(directory) { 298 Ok(json) => json.len(), 299 Err(_) => 0, 300 } 301} 302 303/// Information about a directory that could be split into a subfs record 304#[derive(Debug)] 305pub struct SplittableDirectory { 306 pub path: String, 307 pub directory: FsDirectory<'static>, 308 pub size: usize, 309 pub file_count: usize, 310} 311 312/// Find large directories that could be split into subfs records 313/// Returns directories sorted by size (largest first) 314pub fn find_large_directories(directory: &FsDirectory, current_path: String) -> Vec<SplittableDirectory> { 315 let mut result = Vec::new(); 316 317 for entry in &directory.entries { 318 if let FsEntryNode::Directory(subdir) = &entry.node { 319 let dir_path = if current_path.is_empty() { 320 entry.name.to_string() 321 } else { 322 format!("{}/{}", current_path, entry.name) 323 }; 324 325 let size = estimate_directory_size(subdir); 326 let file_count = count_files_in_directory(subdir); 327 328 result.push(SplittableDirectory { 329 path: dir_path.clone(), 330 directory: (*subdir.clone()).into_static(), 331 size, 332 file_count, 333 }); 334 335 // Recursively find subdirectories 336 let subdirs = find_large_directories(subdir, dir_path); 337 result.extend(subdirs); 338 } 339 } 340 341 // Sort by size (largest first) 342 result.sort_by(|a, b| b.size.cmp(&a.size)); 343 344 result 345} 346 347/// Replace a directory with a subfs node in the tree 348pub fn replace_directory_with_subfs( 349 directory: FsDirectory<'static>, 350 target_path: &str, 351 subfs_uri: &str, 352 flat: bool, 353) -> miette::Result<FsDirectory<'static>> { 354 use jacquard_common::CowStr; 355 use crate::place_wisp::fs::{Entry, Subfs}; 356 357 let path_parts: Vec<&str> = target_path.split('/').collect(); 358 359 if path_parts.is_empty() { 360 return Err(miette::miette!("Cannot replace root directory")); 361 } 362 363 // Parse the subfs URI and make it owned/'static 364 let at_uri = AtUri::new_cow(jacquard_common::CowStr::from(subfs_uri.to_string())).into_diagnostic()?; 365 366 // If this is a root-level directory 367 if path_parts.len() == 1 { 368 let target_name = path_parts[0]; 369 let new_entries: Vec<Entry> = directory.entries.into_iter().map(|entry| { 370 if entry.name == target_name { 371 // Replace this directory with a subfs node 372 Entry::new() 373 .name(entry.name) 374 .node(FsEntryNode::Subfs(Box::new( 375 Subfs::new() 376 .r#type(CowStr::from("subfs")) 377 .subject(at_uri.clone()) 378 .flat(Some(flat)) 379 .build() 380 ))) 381 .build() 382 } else { 383 entry 384 } 385 }).collect(); 386 387 return Ok(FsDirectory::new() 388 .r#type(CowStr::from("directory")) 389 .entries(new_entries) 390 .build()); 391 } 392 393 // Recursively navigate to parent directory 394 let first_part = path_parts[0]; 395 let remaining_path = path_parts[1..].join("/"); 396 397 let new_entries: Vec<Entry> = directory.entries.into_iter().filter_map(|entry| { 398 if entry.name == first_part { 399 if let FsEntryNode::Directory(subdir) = entry.node { 400 // Recursively process this subdirectory 401 match replace_directory_with_subfs((*subdir).into_static(), &remaining_path, subfs_uri, flat) { 402 Ok(updated_subdir) => { 403 Some(Entry::new() 404 .name(entry.name) 405 .node(FsEntryNode::Directory(Box::new(updated_subdir))) 406 .build()) 407 } 408 Err(_) => None, // Skip entries that fail to update 409 } 410 } else { 411 Some(entry) 412 } 413 } else { 414 Some(entry) 415 } 416 }).collect(); 417 418 Ok(FsDirectory::new() 419 .r#type(CowStr::from("directory")) 420 .entries(new_entries) 421 .build()) 422} 423 424/// Delete a subfs record from the PDS 425pub async fn delete_subfs_record( 426 agent: &Agent<impl AgentSession + IdentityResolver>, 427 uri: &str, 428) -> miette::Result<()> { 429 use jacquard_common::types::uri::RecordUri; 430 431 // Construct AT-URI and convert to RecordUri 432 let at_uri = AtUri::new(uri).into_diagnostic()?; 433 let record_uri: RecordUri<'_, crate::place_wisp::subfs::SubfsRecordRecord> = RecordUri::try_from_uri(at_uri).into_diagnostic()?; 434 435 let rkey = record_uri.rkey() 436 .ok_or_else(|| miette::miette!("Invalid subfs URI: missing rkey"))? 437 .clone(); 438 439 agent.delete_record::<SubfsRecord>(rkey).await.into_diagnostic()?; 440 441 Ok(()) 442} 443 444/// Split a large directory into multiple smaller chunks 445/// Returns a list of chunk directories, each small enough to fit in a subfs record 446pub fn split_directory_into_chunks( 447 directory: &FsDirectory, 448 max_size: usize, 449) -> Vec<FsDirectory<'static>> { 450 use jacquard_common::CowStr; 451 452 let mut chunks = Vec::new(); 453 let mut current_chunk_entries = Vec::new(); 454 let mut current_chunk_size = 100; // Base size for directory structure 455 456 for entry in &directory.entries { 457 // Estimate the size of this entry 458 let entry_size = estimate_entry_size(entry); 459 460 // If adding this entry would exceed the max size, start a new chunk 461 if !current_chunk_entries.is_empty() && (current_chunk_size + entry_size > max_size) { 462 // Create a chunk from current entries 463 let chunk = FsDirectory::new() 464 .r#type(CowStr::from("directory")) 465 .entries(current_chunk_entries.clone()) 466 .build(); 467 468 chunks.push(chunk); 469 470 // Start new chunk 471 current_chunk_entries.clear(); 472 current_chunk_size = 100; 473 } 474 475 current_chunk_entries.push(entry.clone().into_static()); 476 current_chunk_size += entry_size; 477 } 478 479 // Add the last chunk if it has any entries 480 if !current_chunk_entries.is_empty() { 481 let chunk = FsDirectory::new() 482 .r#type(CowStr::from("directory")) 483 .entries(current_chunk_entries) 484 .build(); 485 chunks.push(chunk); 486 } 487 488 chunks 489} 490 491/// Estimate the JSON size of a single entry 492fn estimate_entry_size(entry: &crate::place_wisp::fs::Entry) -> usize { 493 match serde_json::to_string(entry) { 494 Ok(json) => json.len(), 495 Err(_) => 500, // Conservative estimate if serialization fails 496 } 497}