1mod client;
2mod error;
3mod firehose;
4mod fs;
5mod resolver;
6
7use atrium_api::{client::AtpServiceClient, com, types};
8use atrium_common::resolver::Resolver;
9use atrium_identity::identity_resolver::ResolvedIdentity;
10use atrium_repo::{Repository, blockstore::CarStore};
11use atrium_xrpc_client::isahc::IsahcClient;
12use fuser::MountOption;
13use futures::{StreamExt, stream};
14use indicatif::{MultiProgress, ProgressBar, ProgressStyle};
15use std::{
16 io::{Cursor, Write},
17 path::PathBuf,
18 sync::Arc,
19};
20
21fn main() {
22 let rt = tokio::runtime::Runtime::new().unwrap();
23 let matches = clap::command!()
24 .arg(
25 clap::Arg::new("handles")
26 .index(1)
27 .required(true)
28 .num_args(1..)
29 .help("One or more handles to download and mount"),
30 )
31 .arg(
32 clap::Arg::new("mountpoint")
33 .short('m')
34 .action(clap::ArgAction::Set)
35 .value_parser(clap::value_parser!(PathBuf)),
36 )
37 .get_matches();
38 let handles = matches
39 .get_many::<String>("handles")
40 .unwrap()
41 .cloned()
42 .collect::<Vec<_>>();
43 let mountpoint = matches
44 .get_one::<PathBuf>("mountpoint")
45 .map(ToOwned::to_owned)
46 .unwrap_or(PathBuf::from("mnt"));
47 let _ = std::fs::create_dir_all(&mountpoint);
48
49 let resolver = Arc::new(resolver::id_resolver());
50 let bars = Arc::new(MultiProgress::new());
51 let repos = rt.block_on(
52 stream::iter(handles)
53 .then(|handle| {
54 let h = handle.clone();
55 let r = Arc::clone(&resolver);
56 let b = Arc::clone(&bars);
57 async move {
58 let id = r.resolve(&h).await?;
59 let bytes = cached_download(&id, &b).await?;
60 let repo = build_repo(bytes).await?;
61 Ok::<_, error::Error>((id.did.clone(), id.pds.clone(), repo))
62 }
63 })
64 .collect::<Vec<_>>(),
65 );
66 let (success, errors): (Vec<_>, Vec<_>) = repos.into_iter().partition(|r| r.is_ok());
67 for e in errors {
68 eprintln!("{:?}", e.as_ref().unwrap_err());
69 }
70 let repos_with_pds: Vec<_> = success
71 .into_iter()
72 .map(|s| s.unwrap())
73 .collect();
74
75 // construct the fs
76 let mut fs = fs::PdsFs::new();
77
78 // Extract (did, pds) pairs for WebSocket tasks before consuming repos
79 let did_pds_pairs: Vec<_> = repos_with_pds.iter()
80 .map(|(did, pds, _)| (did.clone(), pds.clone()))
81 .collect();
82
83 // Consume repos_with_pds to add repos to filesystem
84 for (did, _, repo) in repos_with_pds {
85 rt.block_on(fs.add(did, repo))
86 }
87
88 // get shared state for WebSocket tasks
89 let (_repos_arc, inodes_arc, sizes_arc, content_cache_arc) = fs.get_shared_state();
90
91 // mount
92 let options = vec![
93 MountOption::RO,
94 MountOption::FSName("pdsfs".to_string()),
95 MountOption::AllowOther,
96 MountOption::CUSTOM("local".to_string()),
97 MountOption::CUSTOM("volname=pdsfs".to_string()),
98 ];
99
100 // Create session and get notifier for Finder refresh
101 let session = fuser::Session::new(fs, &mountpoint, &options).unwrap();
102 let notifier = session.notifier();
103 let _bg = session.spawn().unwrap();
104
105 // spawn WebSocket subscription tasks for each DID using the runtime handle
106 let rt_handle = rt.handle().clone();
107 for (did, pds) in did_pds_pairs {
108 let inodes_clone = Arc::clone(&inodes_arc);
109 let sizes_clone = Arc::clone(&sizes_arc);
110 let content_cache_clone = Arc::clone(&content_cache_arc);
111 let notifier_clone = notifier.clone();
112
113 rt_handle.spawn(async move {
114 if let Err(e) = firehose::subscribe_to_repo::<atrium_repo::blockstore::CarStore<std::io::Cursor<Vec<u8>>>>(
115 did,
116 pds,
117 inodes_clone,
118 sizes_clone,
119 content_cache_clone,
120 notifier_clone,
121 ).await {
122 eprintln!("WebSocket error: {:?}", e);
123 }
124 });
125 }
126
127 println!("mounted at {mountpoint:?}");
128 print!("hit enter to unmount and exit...");
129 std::io::stdout().flush().unwrap();
130
131 // Wait for user input
132 let mut input = String::new();
133 std::io::stdin().read_line(&mut input).unwrap();
134
135 println!("unmounted {mountpoint:?}");
136}
137
138async fn cached_download(
139 id: &ResolvedIdentity,
140 m: &MultiProgress,
141) -> Result<Vec<u8>, error::Error> {
142 let mut pb = ProgressBar::new_spinner();
143 pb.set_style(
144 ProgressStyle::default_spinner()
145 .template("{spinner:.green} [{elapsed_precise}] {msg}")
146 .unwrap()
147 .tick_strings(&["⠋", "⠙", "⠹", "⠸", "⠼", "⠴", "⠦", "⠧", "⠇", "⠏"]),
148 );
149 pb.enable_steady_tick(std::time::Duration::from_millis(100));
150 pb = m.add(pb);
151
152 // Always download fresh - no caching for now to ensure up-to-date data
153 pb.set_message(format!("downloading CAR file for...{}", id.did));
154 let bytes = download_car_file(id, &pb).await?;
155
156 pb.finish();
157 Ok(bytes)
158}
159
160async fn download_car_file(
161 id: &ResolvedIdentity,
162 pb: &ProgressBar,
163) -> Result<Vec<u8>, error::Error> {
164 // download the entire car file first before mounting it as a fusefs
165 let client = AtpServiceClient::new(IsahcClient::new(&id.pds));
166 let did = types::string::Did::new(id.did.clone()).unwrap();
167
168 let bytes = client
169 .service
170 .com
171 .atproto
172 .sync
173 .get_repo(com::atproto::sync::get_repo::Parameters::from(
174 com::atproto::sync::get_repo::ParametersData { did, since: None },
175 ))
176 .await?;
177
178 pb.finish_with_message(format!("download complete for \t...\t{}", id.did));
179
180 Ok(bytes)
181}
182
183async fn build_repo(bytes: Vec<u8>) -> Result<Repository<CarStore<Cursor<Vec<u8>>>>, error::Error> {
184 let store = CarStore::open(Cursor::new(bytes)).await?;
185 let root = store.roots().next().unwrap();
186 let repo = Repository::open(store, root).await?;
187 Ok(repo)
188}