mount an atproto PDS repository as a FUSE filesystem
at main 6.1 kB view raw
1mod client; 2mod error; 3mod firehose; 4mod fs; 5mod resolver; 6 7use atrium_api::{client::AtpServiceClient, com, types}; 8use atrium_common::resolver::Resolver; 9use atrium_identity::identity_resolver::ResolvedIdentity; 10use atrium_repo::{Repository, blockstore::CarStore}; 11use atrium_xrpc_client::isahc::IsahcClient; 12use fuser::MountOption; 13use futures::{StreamExt, stream}; 14use indicatif::{MultiProgress, ProgressBar, ProgressStyle}; 15use std::{ 16 io::{Cursor, Write}, 17 path::PathBuf, 18 sync::Arc, 19}; 20 21fn main() { 22 let rt = tokio::runtime::Runtime::new().unwrap(); 23 let matches = clap::command!() 24 .arg( 25 clap::Arg::new("handles") 26 .index(1) 27 .required(true) 28 .num_args(1..) 29 .help("One or more handles to download and mount"), 30 ) 31 .arg( 32 clap::Arg::new("mountpoint") 33 .short('m') 34 .action(clap::ArgAction::Set) 35 .value_parser(clap::value_parser!(PathBuf)), 36 ) 37 .get_matches(); 38 let handles = matches 39 .get_many::<String>("handles") 40 .unwrap() 41 .cloned() 42 .collect::<Vec<_>>(); 43 let mountpoint = matches 44 .get_one::<PathBuf>("mountpoint") 45 .map(ToOwned::to_owned) 46 .unwrap_or(PathBuf::from("mnt")); 47 let _ = std::fs::create_dir_all(&mountpoint); 48 49 let resolver = Arc::new(resolver::id_resolver()); 50 let bars = Arc::new(MultiProgress::new()); 51 let repos = rt.block_on( 52 stream::iter(handles) 53 .then(|handle| { 54 let h = handle.clone(); 55 let r = Arc::clone(&resolver); 56 let b = Arc::clone(&bars); 57 async move { 58 let id = r.resolve(&h).await?; 59 let bytes = cached_download(&id, &b).await?; 60 let repo = build_repo(bytes).await?; 61 Ok::<_, error::Error>((id.did.clone(), id.pds.clone(), repo)) 62 } 63 }) 64 .collect::<Vec<_>>(), 65 ); 66 let (success, errors): (Vec<_>, Vec<_>) = repos.into_iter().partition(|r| r.is_ok()); 67 for e in errors { 68 eprintln!("{:?}", e.as_ref().unwrap_err()); 69 } 70 let repos_with_pds: Vec<_> = success 71 .into_iter() 72 .map(|s| s.unwrap()) 73 .collect(); 74 75 // construct the fs 76 let mut fs = fs::PdsFs::new(); 77 78 // Extract (did, pds) pairs for WebSocket tasks before consuming repos 79 let did_pds_pairs: Vec<_> = repos_with_pds.iter() 80 .map(|(did, pds, _)| (did.clone(), pds.clone())) 81 .collect(); 82 83 // Consume repos_with_pds to add repos to filesystem 84 for (did, _, repo) in repos_with_pds { 85 rt.block_on(fs.add(did, repo)) 86 } 87 88 // get shared state for WebSocket tasks 89 let (_repos_arc, inodes_arc, sizes_arc, content_cache_arc) = fs.get_shared_state(); 90 91 // mount 92 let options = vec![ 93 MountOption::RO, 94 MountOption::FSName("pdsfs".to_string()), 95 MountOption::AllowOther, 96 MountOption::CUSTOM("local".to_string()), 97 MountOption::CUSTOM("volname=pdsfs".to_string()), 98 ]; 99 100 // Create session and get notifier for Finder refresh 101 let session = fuser::Session::new(fs, &mountpoint, &options).unwrap(); 102 let notifier = session.notifier(); 103 let _bg = session.spawn().unwrap(); 104 105 // spawn WebSocket subscription tasks for each DID using the runtime handle 106 let rt_handle = rt.handle().clone(); 107 for (did, pds) in did_pds_pairs { 108 let inodes_clone = Arc::clone(&inodes_arc); 109 let sizes_clone = Arc::clone(&sizes_arc); 110 let content_cache_clone = Arc::clone(&content_cache_arc); 111 let notifier_clone = notifier.clone(); 112 113 rt_handle.spawn(async move { 114 if let Err(e) = firehose::subscribe_to_repo::<atrium_repo::blockstore::CarStore<std::io::Cursor<Vec<u8>>>>( 115 did, 116 pds, 117 inodes_clone, 118 sizes_clone, 119 content_cache_clone, 120 notifier_clone, 121 ).await { 122 eprintln!("WebSocket error: {:?}", e); 123 } 124 }); 125 } 126 127 println!("mounted at {mountpoint:?}"); 128 print!("hit enter to unmount and exit..."); 129 std::io::stdout().flush().unwrap(); 130 131 // Wait for user input 132 let mut input = String::new(); 133 std::io::stdin().read_line(&mut input).unwrap(); 134 135 println!("unmounted {mountpoint:?}"); 136} 137 138async fn cached_download( 139 id: &ResolvedIdentity, 140 m: &MultiProgress, 141) -> Result<Vec<u8>, error::Error> { 142 let mut pb = ProgressBar::new_spinner(); 143 pb.set_style( 144 ProgressStyle::default_spinner() 145 .template("{spinner:.green} [{elapsed_precise}] {msg}") 146 .unwrap() 147 .tick_strings(&["", "", "", "", "", "", "", "", "", ""]), 148 ); 149 pb.enable_steady_tick(std::time::Duration::from_millis(100)); 150 pb = m.add(pb); 151 152 // Always download fresh - no caching for now to ensure up-to-date data 153 pb.set_message(format!("downloading CAR file for...{}", id.did)); 154 let bytes = download_car_file(id, &pb).await?; 155 156 pb.finish(); 157 Ok(bytes) 158} 159 160async fn download_car_file( 161 id: &ResolvedIdentity, 162 pb: &ProgressBar, 163) -> Result<Vec<u8>, error::Error> { 164 // download the entire car file first before mounting it as a fusefs 165 let client = AtpServiceClient::new(IsahcClient::new(&id.pds)); 166 let did = types::string::Did::new(id.did.clone()).unwrap(); 167 168 let bytes = client 169 .service 170 .com 171 .atproto 172 .sync 173 .get_repo(com::atproto::sync::get_repo::Parameters::from( 174 com::atproto::sync::get_repo::ParametersData { did, since: None }, 175 )) 176 .await?; 177 178 pb.finish_with_message(format!("download complete for \t...\t{}", id.did)); 179 180 Ok(bytes) 181} 182 183async fn build_repo(bytes: Vec<u8>) -> Result<Repository<CarStore<Cursor<Vec<u8>>>>, error::Error> { 184 let store = CarStore::open(Cursor::new(bytes)).await?; 185 let root = store.roots().next().unwrap(); 186 let repo = Repository::open(store, root).await?; 187 Ok(repo) 188}