mount an atproto PDS repository as a FUSE filesystem
1#![feature(let_chains)] 2mod client; 3mod error; 4mod fs; 5mod resolver; 6 7use atrium_api::{client::AtpServiceClient, com, types}; 8use atrium_common::resolver::Resolver; 9use atrium_identity::identity_resolver::ResolvedIdentity; 10use atrium_repo::{Repository, blockstore::CarStore}; 11use atrium_xrpc_client::isahc::IsahcClient; 12use fuser::MountOption; 13use futures::{Stream, StreamExt, TryStream, TryStreamExt, stream}; 14use indicatif::{MultiProgress, ProgressBar, ProgressStyle}; 15use std::{ 16 collections::HashMap, 17 io::{Cursor, Write}, 18 path::PathBuf, 19 sync::Arc, 20}; 21use xdg::BaseDirectories; 22 23fn main() { 24 let rt = tokio::runtime::Runtime::new().unwrap(); 25 let matches = clap::command!() 26 .arg( 27 clap::Arg::new("handles") 28 .index(1) 29 .required(true) 30 .num_args(1..) 31 .help("One or more handles to download and mount"), 32 ) 33 .arg( 34 clap::Arg::new("mountpoint") 35 .short('m') 36 .action(clap::ArgAction::Set) 37 .value_parser(clap::value_parser!(PathBuf)), 38 ) 39 .get_matches(); 40 let handles = matches 41 .get_many::<String>("handles") 42 .unwrap() 43 .cloned() 44 .collect::<Vec<_>>(); 45 let mountpoint = matches 46 .get_one::<PathBuf>("mountpoint") 47 .map(ToOwned::to_owned) 48 .unwrap_or(PathBuf::from("mnt")); 49 let _ = std::fs::create_dir_all(&mountpoint); 50 51 let resolver = Arc::new(resolver::id_resolver()); 52 let bars = Arc::new(MultiProgress::new()); 53 let repos = rt.block_on( 54 stream::iter(handles) 55 .then(|handle| { 56 let h = handle.clone(); 57 let r = Arc::clone(&resolver); 58 let b = Arc::clone(&bars); 59 async move { 60 let id = r.resolve(&h).await?; 61 let bytes = cached_download(&id, &b).await?; 62 let repo = build_repo(bytes).await?; 63 Ok::<_, error::Error>((id.did, repo)) 64 } 65 }) 66 .collect::<Vec<_>>(), 67 ); 68 let (success, errors): (Vec<_>, Vec<_>) = repos.into_iter().partition(|r| r.is_ok()); 69 for e in errors { 70 eprintln!("{:?}", e.as_ref().unwrap_err()); 71 } 72 let repos = success 73 .into_iter() 74 .map(|s| s.unwrap()) 75 .collect::<HashMap<_, _>>(); 76 77 // construct the fs 78 let mut fs = fs::PdsFs::new(); 79 for (did, repo) in repos { 80 rt.block_on(fs.add(did, repo)) 81 } 82 83 // mount 84 let options = vec![MountOption::RO, MountOption::FSName("pdsfs".to_string())]; 85 let join_handle = fuser::spawn_mount2(fs, &mountpoint, &options).unwrap(); 86 87 println!("mounted at {mountpoint:?}"); 88 print!("hit enter to unmount and exit..."); 89 std::io::stdout().flush().unwrap(); 90 91 // Wait for user input 92 let mut input = String::new(); 93 std::io::stdin().read_line(&mut input).unwrap(); 94 95 join_handle.join(); 96 std::fs::remove_dir(&mountpoint).unwrap(); 97 98 println!("unmounted {mountpoint:?}"); 99} 100 101async fn cached_download( 102 id: &ResolvedIdentity, 103 m: &MultiProgress, 104) -> Result<Vec<u8>, error::Error> { 105 let mut pb = ProgressBar::new_spinner(); 106 pb.set_style( 107 ProgressStyle::default_spinner() 108 .template("{spinner:.green} [{elapsed_precise}] {msg}") 109 .unwrap() 110 .tick_strings(&["", "", "", "", "", "", "", "", "", ""]), 111 ); 112 pb.enable_steady_tick(std::time::Duration::from_millis(100)); 113 pb = m.add(pb); 114 115 let dirs = BaseDirectories::new(); 116 117 let dir = dirs.get_data_home().expect("$HOME is absent").join("pdsfs"); 118 tokio::fs::create_dir_all(&dir).await?; 119 120 let file = dir.join(&id.did); 121 let exists = std::fs::exists(&file)?; 122 123 let bytes = if !exists { 124 pb.set_message(format!("downloading CAR file for...{}", id.did)); 125 download_car_file(id, &pb).await? 126 } else { 127 pb.set_message(format!("using cached CAR file for...{}", id.did)); 128 tokio::fs::read(&file).await? 129 }; 130 131 // write to disk 132 if !exists { 133 tokio::fs::write(&file, &bytes).await?; 134 } 135 136 pb.finish(); 137 Ok(bytes) 138} 139 140async fn download_car_file( 141 id: &ResolvedIdentity, 142 pb: &ProgressBar, 143) -> Result<Vec<u8>, error::Error> { 144 // download the entire car file first before mounting it as a fusefs 145 let client = AtpServiceClient::new(IsahcClient::new(&id.pds)); 146 let did = types::string::Did::new(id.did.clone()).unwrap(); 147 148 let bytes = client 149 .service 150 .com 151 .atproto 152 .sync 153 .get_repo(com::atproto::sync::get_repo::Parameters::from( 154 com::atproto::sync::get_repo::ParametersData { did, since: None }, 155 )) 156 .await?; 157 158 pb.finish_with_message(format!("download complete for \t...\t{}", id.did)); 159 160 Ok(bytes) 161} 162 163async fn build_repo(bytes: Vec<u8>) -> Result<Repository<CarStore<Cursor<Vec<u8>>>>, error::Error> { 164 let store = CarStore::open(Cursor::new(bytes)).await?; 165 let root = store.roots().next().unwrap(); 166 let repo = Repository::open(store, root).await?; 167 Ok(repo) 168}