1#![feature(let_chains)]
2mod client;
3mod error;
4mod fs;
5mod resolver;
6
7use atrium_api::{client::AtpServiceClient, com, types};
8use atrium_common::resolver::Resolver;
9use atrium_identity::identity_resolver::ResolvedIdentity;
10use atrium_repo::{Repository, blockstore::CarStore};
11use atrium_xrpc_client::isahc::IsahcClient;
12use fuser::MountOption;
13use futures::{Stream, StreamExt, TryStream, TryStreamExt, stream};
14use indicatif::{MultiProgress, ProgressBar, ProgressStyle};
15use std::{
16 collections::HashMap,
17 io::{Cursor, Write},
18 path::PathBuf,
19 sync::Arc,
20};
21use xdg::BaseDirectories;
22
23fn main() {
24 let rt = tokio::runtime::Runtime::new().unwrap();
25 let matches = clap::command!()
26 .arg(
27 clap::Arg::new("handles")
28 .index(1)
29 .required(true)
30 .num_args(1..)
31 .help("One or more handles to download and mount"),
32 )
33 .arg(
34 clap::Arg::new("mountpoint")
35 .short('m')
36 .action(clap::ArgAction::Set)
37 .value_parser(clap::value_parser!(PathBuf)),
38 )
39 .get_matches();
40 let handles = matches
41 .get_many::<String>("handles")
42 .unwrap()
43 .cloned()
44 .collect::<Vec<_>>();
45 let mountpoint = matches
46 .get_one::<PathBuf>("mountpoint")
47 .map(ToOwned::to_owned)
48 .unwrap_or(PathBuf::from("mnt"));
49 let _ = std::fs::create_dir_all(&mountpoint);
50
51 let resolver = Arc::new(resolver::id_resolver());
52 let bars = Arc::new(MultiProgress::new());
53 let repos = rt.block_on(
54 stream::iter(handles)
55 .then(|handle| {
56 let h = handle.clone();
57 let r = Arc::clone(&resolver);
58 let b = Arc::clone(&bars);
59 async move {
60 let id = r.resolve(&h).await?;
61 let bytes = cached_download(&id, &b).await?;
62 let repo = build_repo(bytes).await?;
63 Ok::<_, error::Error>((id.did, repo))
64 }
65 })
66 .collect::<Vec<_>>(),
67 );
68 let (success, errors): (Vec<_>, Vec<_>) = repos.into_iter().partition(|r| r.is_ok());
69 for e in errors {
70 eprintln!("{:?}", e.as_ref().unwrap_err());
71 }
72 let repos = success
73 .into_iter()
74 .map(|s| s.unwrap())
75 .collect::<HashMap<_, _>>();
76
77 // construct the fs
78 let mut fs = fs::PdsFs::new();
79 for (did, repo) in repos {
80 rt.block_on(fs.add(did, repo))
81 }
82
83 // mount
84 let options = vec![MountOption::RO, MountOption::FSName("pdsfs".to_string())];
85 let join_handle = fuser::spawn_mount2(fs, &mountpoint, &options).unwrap();
86
87 println!("mounted at {mountpoint:?}");
88 print!("hit enter to unmount and exit...");
89 std::io::stdout().flush().unwrap();
90
91 // Wait for user input
92 let mut input = String::new();
93 std::io::stdin().read_line(&mut input).unwrap();
94
95 join_handle.join();
96 std::fs::remove_dir(&mountpoint).unwrap();
97
98 println!("unmounted {mountpoint:?}");
99}
100
101async fn cached_download(
102 id: &ResolvedIdentity,
103 m: &MultiProgress,
104) -> Result<Vec<u8>, error::Error> {
105 let mut pb = ProgressBar::new_spinner();
106 pb.set_style(
107 ProgressStyle::default_spinner()
108 .template("{spinner:.green} [{elapsed_precise}] {msg}")
109 .unwrap()
110 .tick_strings(&["⠋", "⠙", "⠹", "⠸", "⠼", "⠴", "⠦", "⠧", "⠇", "⠏"]),
111 );
112 pb.enable_steady_tick(std::time::Duration::from_millis(100));
113 pb = m.add(pb);
114
115 let dirs = BaseDirectories::new();
116
117 let dir = dirs.get_data_home().expect("$HOME is absent").join("pdsfs");
118 tokio::fs::create_dir_all(&dir).await?;
119
120 let file = dir.join(&id.did);
121 let exists = std::fs::exists(&file)?;
122
123 let bytes = if !exists {
124 pb.set_message(format!("downloading CAR file for...{}", id.did));
125 download_car_file(id, &pb).await?
126 } else {
127 pb.set_message(format!("using cached CAR file for...{}", id.did));
128 tokio::fs::read(&file).await?
129 };
130
131 // write to disk
132 if !exists {
133 tokio::fs::write(&file, &bytes).await?;
134 }
135
136 pb.finish();
137 Ok(bytes)
138}
139
140async fn download_car_file(
141 id: &ResolvedIdentity,
142 pb: &ProgressBar,
143) -> Result<Vec<u8>, error::Error> {
144 // download the entire car file first before mounting it as a fusefs
145 let client = AtpServiceClient::new(IsahcClient::new(&id.pds));
146 let did = types::string::Did::new(id.did.clone()).unwrap();
147
148 let bytes = client
149 .service
150 .com
151 .atproto
152 .sync
153 .get_repo(com::atproto::sync::get_repo::Parameters::from(
154 com::atproto::sync::get_repo::ParametersData { did, since: None },
155 ))
156 .await?;
157
158 pb.finish_with_message(format!("download complete for \t...\t{}", id.did));
159
160 Ok(bytes)
161}
162
163async fn build_repo(bytes: Vec<u8>) -> Result<Repository<CarStore<Cursor<Vec<u8>>>>, error::Error> {
164 let store = CarStore::open(Cursor::new(bytes)).await?;
165 let root = store.roots().next().unwrap();
166 let repo = Repository::open(store, root).await?;
167 Ok(repo)
168}