mount an atproto PDS repository as a FUSE filesystem oppi.li/posts/mounting_the_atmosphere/

Watch live changes from the firehose #11

closed opened by danabra.mov targeting main from danabra.mov/pdsfs: firehose
Labels

None yet.

Participants 1
AT URI
at://did:plc:fpruhuo22xkm5o7ttr2ktxdo/sh.tangled.repo.pull/3m34e5c724p22
+80 -506
Interdiff #0 โ†’ #1
+70 -82
src/main.rs
··· 1 - mod client; 2 - mod error; 3 - mod firehose; 4 - mod fs; 5 - mod resolver; 6 1 7 2 8 3 ··· 10 5 11 6 12 7 8 + 9 + 10 + 11 + 12 + 13 + 14 + 15 + 16 + 13 - use futures::{StreamExt, stream}; 14 - use indicatif::{MultiProgress, ProgressBar, ProgressStyle}; 15 - use std::{ 16 - io::{Cursor, Write}, 17 17 path::PathBuf, 18 18 sync::Arc, 19 19 }; 20 + use xdg::BaseDirectories; 20 21 21 22 fn main() { 22 23 let rt = tokio::runtime::Runtime::new().unwrap(); ··· 55 56 56 57 57 58 58 - let id = r.resolve(&h).await?; 59 - let bytes = cached_download(&id, &b).await?; 60 - let repo = build_repo(bytes).await?; 61 - Ok::<_, error::Error>((id.did.clone(), id.pds.clone(), repo)) 62 - } 63 - }) 64 - .collect::<Vec<_>>(), 65 59 66 60 67 - for e in errors { 68 - eprintln!("{:?}", e.as_ref().unwrap_err()); 69 - } 70 - let repos_with_pds: Vec<_> = success 71 - .into_iter() 72 - .map(|s| s.unwrap()) 73 - .collect(); 74 - 75 - // construct the fs 76 - let mut fs = fs::PdsFs::new(); 77 - 78 - // Extract (did, pds) pairs for WebSocket tasks before consuming repos 79 - let did_pds_pairs: Vec<_> = repos_with_pds.iter() 80 - .map(|(did, pds, _)| (did.clone(), pds.clone())) 81 - .collect(); 82 - 83 - // Consume repos_with_pds to add repos to filesystem 84 - for (did, _, repo) in repos_with_pds { 85 - rt.block_on(fs.add(did, repo)) 86 - } 87 61 88 - // get shared state for WebSocket tasks 89 - let (_repos_arc, inodes_arc, sizes_arc, content_cache_arc) = fs.get_shared_state(); 90 62 91 - // mount 92 - let options = vec![ 93 - MountOption::RO, 94 - 95 - 96 - MountOption::CUSTOM("local".to_string()), 97 - MountOption::CUSTOM("volname=pdsfs".to_string()), 98 - ]; 99 - 100 - // Create session and get notifier for Finder refresh 101 - let session = fuser::Session::new(fs, &mountpoint, &options).unwrap(); 102 - let notifier = session.notifier(); 103 - let _bg = session.spawn().unwrap(); 104 - 105 - // spawn WebSocket subscription tasks for each DID using the runtime handle 106 - let rt_handle = rt.handle().clone(); 107 - for (did, pds) in did_pds_pairs { 108 - let inodes_clone = Arc::clone(&inodes_arc); 109 - let sizes_clone = Arc::clone(&sizes_arc); 110 - let content_cache_clone = Arc::clone(&content_cache_arc); 111 - let notifier_clone = notifier.clone(); 112 - 113 - rt_handle.spawn(async move { 114 - if let Err(e) = firehose::subscribe_to_repo::<atrium_repo::blockstore::CarStore<std::io::Cursor<Vec<u8>>>>( 115 - did, 116 - pds, 117 - inodes_clone, 118 - sizes_clone, 119 - content_cache_clone, 120 - notifier_clone, 121 - ).await { 122 - eprintln!("WebSocket error: {:?}", e); 123 - } 124 - }); 125 - } 126 63 127 - println!("mounted at {mountpoint:?}"); 128 - print!("hit enter to unmount and exit..."); 129 64 130 65 131 66 132 - let mut input = String::new(); 133 - std::io::stdin().read_line(&mut input).unwrap(); 134 67 68 + 69 + 70 + 71 + 72 + 73 + 74 + 75 + 76 + 77 + 78 + 79 + 80 + 81 + 82 + 83 + 84 + 85 + 86 + 87 + 88 + 89 + 90 + 91 + 92 + 93 + 94 + 95 + 96 + 97 + 98 + 99 + 100 + 101 + 102 + 103 + 104 + 135 - println!("unmounted {mountpoint:?}"); 136 - } 137 105 138 106 139 107 ··· 149 117 pb.enable_steady_tick(std::time::Duration::from_millis(100)); 150 118 pb = m.add(pb); 151 119 120 + let dirs = BaseDirectories::new(); 121 + 122 + let dir = dirs 123 + .get_cache_home() 124 + .expect("$HOME is absent") 125 + .join("pdsfs"); 126 + tokio::fs::create_dir_all(&dir).await?; 127 + 128 + let file = dir.join(&id.did); 129 + let exists = std::fs::exists(&file)?; 130 + 131 + let bytes = if !exists { 132 + pb.set_message(format!("downloading CAR file for...{}", id.did)); 133 + download_car_file(id, &pb).await? 134 + } else { 135 + pb.set_message(format!("using cached CAR file for...{}", id.did)); 136 + tokio::fs::read(&file).await? 137 + }; 138 + 139 + // write to disk 140 + if !exists { 141 + tokio::fs::write(&file, &bytes).await?; 142 + } 152 - // Always download fresh - no caching for now to ensure up-to-date data 153 - pb.set_message(format!("downloading CAR file for...{}", id.did)); 154 - let bytes = download_car_file(id, &pb).await?; 155 143 156 144 pb.finish(); 157 145 Ok(bytes)
+7 -101
Cargo.lock
··· 97 97 "windows-sys 0.59.0", 98 98 ] 99 99 100 - [[package]] 101 - name = "anyhow" 102 - version = "1.0.100" 103 - source = "registry+https://github.com/rust-lang/crates.io-index" 104 - checksum = "a23eb6b1614318a8071c9b2521f36b424b2c83db5eb3a0fead4a6c0809af6e61" 105 - 106 100 [[package]] 107 101 name = "async-channel" 108 102 version = "1.9.0" ··· 334 328 source = "registry+https://github.com/rust-lang/crates.io-index" 335 329 checksum = "46c5e41b57b8bba42a04676d81cb89e9ee8e859a1a66f80a5a72e1cb76b34d43" 336 330 337 - [[package]] 338 - name = "byteorder" 339 - version = "1.5.0" 340 - source = "registry+https://github.com/rust-lang/crates.io-index" 341 - checksum = "1fd0f2584146f6f2ef48085050886acf353beff7305ebd1ae69500e27c67f64b" 342 - 343 331 [[package]] 344 332 name = "bytes" 345 333 version = "1.10.1" ··· 681 669 checksum = "778e2ac28f6c47af28e4907f13ffd1e1ddbd400980a9abd7c8df189bf578a5ad" 682 670 dependencies = [ 683 671 "libc", 672 + "windows-sys 0.59.0", 684 - "windows-sys 0.60.2", 685 673 ] 686 674 687 675 [[package]] ··· 1003 991 "idna", 1004 992 "ipnet", 1005 993 "once_cell", 994 + "rand", 1006 - "rand 0.9.2", 1007 995 "ring", 1008 996 "thiserror 2.0.12", 1009 997 "tinyvec", ··· 1025 1013 "moka", 1026 1014 "once_cell", 1027 1015 "parking_lot", 1016 + "rand", 1028 - "rand 0.9.2", 1029 1017 "resolv-conf", 1030 1018 "smallvec", 1031 1019 "thiserror 2.0.12", ··· 1769 1757 name = "pdsfs" 1770 1758 version = "0.1.0" 1771 1759 dependencies = [ 1772 - "anyhow", 1773 1760 "atrium-api", 1774 1761 "atrium-common", 1775 1762 "atrium-identity", ··· 1789 1776 "serde_json", 1790 1777 "thiserror 2.0.12", 1791 1778 "tokio", 1792 - "tokio-tungstenite", 1793 1779 "xdg", 1794 1780 ] 1795 1781 ··· 1901 1887 source = "registry+https://github.com/rust-lang/crates.io-index" 1902 1888 checksum = "69cdb34c158ceb288df11e18b4bd39de994f6657d83847bdffdbd7f346754b0f" 1903 1889 1904 - [[package]] 1905 - name = "rand" 1906 - version = "0.8.5" 1907 - source = "registry+https://github.com/rust-lang/crates.io-index" 1908 - checksum = "34af8d1a0e25924bc5b7c43c079c942339d8f0a8b57c39049bef581b46327404" 1909 - dependencies = [ 1910 - "libc", 1911 - "rand_chacha 0.3.1", 1912 - "rand_core 0.6.4", 1913 - ] 1914 - 1915 1890 [[package]] 1916 1891 name = "rand" 1917 1892 version = "0.9.2" 1918 1893 source = "registry+https://github.com/rust-lang/crates.io-index" 1919 1894 checksum = "6db2770f06117d490610c7488547d543617b21bfa07796d7a12f6f1bd53850d1" 1920 1895 dependencies = [ 1896 + "rand_chacha", 1897 + "rand_core", 1921 - "rand_chacha 0.9.0", 1922 - "rand_core 0.9.3", 1923 - ] 1924 - 1925 - [[package]] 1926 - name = "rand_chacha" 1927 - version = "0.3.1" 1928 - source = "registry+https://github.com/rust-lang/crates.io-index" 1929 - checksum = "e6c10a63a0fa32252be49d21e7709d4d4baf8d231c2dbce1eaa8141b9b127d88" 1930 - dependencies = [ 1931 - "ppv-lite86", 1932 - "rand_core 0.6.4", 1933 1898 ] 1934 1899 1935 1900 [[package]] ··· 1939 1904 checksum = "d3022b5f1df60f26e1ffddd6c66e8aa15de382ae63b3a0c1bfc0e4d3e3f325cb" 1940 1905 dependencies = [ 1941 1906 "ppv-lite86", 1907 + "rand_core", 1942 - "rand_core 0.9.3", 1943 - ] 1944 - 1945 - [[package]] 1946 - name = "rand_core" 1947 - version = "0.6.4" 1948 - source = "registry+https://github.com/rust-lang/crates.io-index" 1949 - checksum = "ec0be4795e2f6a28069bec0b5ff3e2ac9bafc99e6a9a7dc3547996c5c816922c" 1950 - dependencies = [ 1951 - "getrandom 0.2.16", 1952 1908 ] 1953 1909 1954 1910 [[package]] ··· 2101 2057 "errno", 2102 2058 "libc", 2103 2059 "linux-raw-sys", 2060 + "windows-sys 0.59.0", 2104 - "windows-sys 0.60.2", 2105 2061 ] 2106 2062 2107 2063 [[package]] ··· 2277 2233 "serde", 2278 2234 ] 2279 2235 2280 - [[package]] 2281 - name = "sha1" 2282 - version = "0.10.6" 2283 - source = "registry+https://github.com/rust-lang/crates.io-index" 2284 - checksum = "e3bf829a2d51ab4a5ddf1352d8470c140cadc8301b2ae1789db023f01cedd6ba" 2285 - dependencies = [ 2286 - "cfg-if", 2287 - "cpufeatures", 2288 - "digest", 2289 - ] 2290 - 2291 2236 [[package]] 2292 2237 name = "sha2" 2293 2238 version = "0.10.9" ··· 2569 2514 "tokio", 2570 2515 ] 2571 2516 2572 - [[package]] 2573 - name = "tokio-tungstenite" 2574 - version = "0.24.0" 2575 - source = "registry+https://github.com/rust-lang/crates.io-index" 2576 - checksum = "edc5f74e248dc973e0dbb7b74c7e0d6fcc301c694ff50049504004ef4d0cdcd9" 2577 - dependencies = [ 2578 - "futures-util", 2579 - "log", 2580 - "native-tls", 2581 - "tokio", 2582 - "tokio-native-tls", 2583 - "tungstenite", 2584 - ] 2585 - 2586 2517 [[package]] 2587 2518 name = "tokio-util" 2588 2519 version = "0.7.15" ··· 2731 2662 source = "registry+https://github.com/rust-lang/crates.io-index" 2732 2663 checksum = "e421abadd41a4225275504ea4d6566923418b7f05506fbc9c0fe86ba7396114b" 2733 2664 2734 - [[package]] 2735 - name = "tungstenite" 2736 - version = "0.24.0" 2737 - source = "registry+https://github.com/rust-lang/crates.io-index" 2738 - checksum = "18e5b8366ee7a95b16d32197d0b2604b43a0be89dc5fac9f8e96ccafbaedda8a" 2739 - dependencies = [ 2740 - "byteorder", 2741 - "bytes", 2742 - "data-encoding", 2743 - "http 1.3.1", 2744 - "httparse", 2745 - "log", 2746 - "native-tls", 2747 - "rand 0.8.5", 2748 - "sha1", 2749 - "thiserror 1.0.69", 2750 - "utf-8", 2751 - ] 2752 - 2753 2665 [[package]] 2754 2666 name = "typenum" 2755 2667 version = "1.18.0" ··· 2801 2713 "percent-encoding", 2802 2714 ] 2803 2715 2804 - [[package]] 2805 - name = "utf-8" 2806 - version = "0.7.6" 2807 - source = "registry+https://github.com/rust-lang/crates.io-index" 2808 - checksum = "09cc8ee72d2a9becf2f2febe0205bbed8fc6615b7cb429ad062dc7b7ddd036a9" 2809 - 2810 2716 [[package]] 2811 2717 name = "utf8_iter" 2812 2718 version = "1.0.4"
+2 -4
Cargo.toml
··· 4 4 edition = "2024" 5 5 6 6 [dependencies] 7 - anyhow = "1.0" 8 7 atrium-api = "0.25.4" 9 8 atrium-common = "0.1.2" 10 9 atrium-identity = "0.1.5" ··· 12 11 atrium-xrpc = "0.12.3" 13 12 atrium-xrpc-client = { version = "0.5.14", features=["isahc"] } 14 13 clap = { version = "4.5.41", features = ["cargo"] } 14 + fuser = "0.15.1" 15 - fuser = { version = "0.15.1", features = ["abi-7-18"] } 16 15 futures = "0.3.31" 17 16 hickory-resolver = "0.25.2" 18 17 indexmap = "2.10.0" ··· 23 22 serde_ipld_dagcbor = "0.6.3" 24 23 serde_json = "1.0.141" 25 24 thiserror = "2.0.12" 25 + tokio = { version = "1.46.1", features = ["fs"] } 26 - tokio = { version = "1.46.1", features = ["fs", "sync", "rt-multi-thread"] } 27 - tokio-tungstenite = { version = "0.24", features = ["native-tls"] } 28 26 xdg = "3.0.0"
-318
src/firehose.rs
··· 1 - use anyhow::{anyhow, Result}; 2 - use atrium_api::com::atproto::sync::subscribe_repos::{Commit, NSID}; 3 - use atrium_api::client::AtpServiceClient; 4 - use atrium_api::com; 5 - use atrium_api::types; 6 - use atrium_xrpc_client::isahc::IsahcClient; 7 - use futures::StreamExt; 8 - use ipld_core::ipld::Ipld; 9 - use std::io::Cursor; 10 - use std::sync::{Arc, Mutex}; 11 - use tokio_tungstenite::connect_async; 12 - use tokio_tungstenite::tungstenite::Message; 13 - 14 - use crate::fs::{PdsFsCollection, PdsFsEntry, PdsFsRecord}; 15 - use indexmap::{IndexMap, IndexSet}; 16 - 17 - /// Frame header types for WebSocket messages 18 - #[derive(Debug, Clone, PartialEq, Eq)] 19 - enum FrameHeader { 20 - Message(Option<String>), 21 - Error, 22 - } 23 - 24 - impl TryFrom<Ipld> for FrameHeader { 25 - type Error = anyhow::Error; 26 - 27 - fn try_from(value: Ipld) -> Result<Self> { 28 - if let Ipld::Map(map) = value { 29 - if let Some(Ipld::Integer(i)) = map.get("op") { 30 - match i { 31 - 1 => { 32 - let t = if let Some(Ipld::String(s)) = map.get("t") { 33 - Some(s.clone()) 34 - } else { 35 - None 36 - }; 37 - return Ok(FrameHeader::Message(t)); 38 - } 39 - -1 => return Ok(FrameHeader::Error), 40 - _ => {} 41 - } 42 - } 43 - } 44 - Err(anyhow!("invalid frame type")) 45 - } 46 - } 47 - 48 - /// Frame types for parsed WebSocket messages 49 - #[derive(Debug, Clone, PartialEq, Eq)] 50 - pub enum Frame { 51 - Message(Option<String>, MessageFrame), 52 - Error(ErrorFrame), 53 - } 54 - 55 - #[derive(Debug, Clone, PartialEq, Eq)] 56 - pub struct MessageFrame { 57 - pub body: Vec<u8>, 58 - } 59 - 60 - #[derive(Debug, Clone, PartialEq, Eq)] 61 - pub struct ErrorFrame {} 62 - 63 - impl TryFrom<&[u8]> for Frame { 64 - type Error = anyhow::Error; 65 - 66 - fn try_from(value: &[u8]) -> Result<Self> { 67 - let mut cursor = Cursor::new(value); 68 - let (left, right) = match serde_ipld_dagcbor::from_reader::<Ipld, _>(&mut cursor) { 69 - Err(serde_ipld_dagcbor::DecodeError::TrailingData) => { 70 - value.split_at(cursor.position() as usize) 71 - } 72 - _ => { 73 - return Err(anyhow!("invalid frame type")); 74 - } 75 - }; 76 - let header = FrameHeader::try_from(serde_ipld_dagcbor::from_slice::<Ipld>(left)?)?; 77 - if let FrameHeader::Message(t) = &header { 78 - Ok(Frame::Message(t.clone(), MessageFrame { body: right.to_vec() })) 79 - } else { 80 - Ok(Frame::Error(ErrorFrame {})) 81 - } 82 - } 83 - } 84 - 85 - /// Subscribe to a repo's firehose and update inodes on changes 86 - pub async fn subscribe_to_repo<R>( 87 - did: String, 88 - pds: String, 89 - inodes: Arc<Mutex<IndexSet<PdsFsEntry>>>, 90 - sizes: Arc<Mutex<IndexMap<usize, u64>>>, 91 - content_cache: Arc<Mutex<IndexMap<String, String>>>, 92 - notifier: fuser::Notifier, 93 - ) -> Result<()> 94 - where 95 - R: atrium_repo::blockstore::AsyncBlockStoreRead, 96 - { 97 - // Strip https:// or http:// prefix from PDS URL if present 98 - let pds_host = pds.trim_start_matches("https://").trim_start_matches("http://"); 99 - let url = format!("wss://{}/xrpc/{}", pds_host, NSID); 100 - println!("Connecting to firehose: {}", url); 101 - 102 - let (mut stream, _) = connect_async(url).await?; 103 - println!("Connected to firehose for {}", did); 104 - 105 - loop { 106 - match stream.next().await { 107 - Some(Ok(Message::Binary(data))) => { 108 - if let Ok(Frame::Message(Some(t), msg)) = Frame::try_from(data.as_slice()) { 109 - if t.as_str() == "#commit" { 110 - if let Ok(commit) = serde_ipld_dagcbor::from_reader::<Commit, _>(msg.body.as_slice()) { 111 - // Only process commits for our DID 112 - if commit.repo.as_str() == did { 113 - if let Err(e) = handle_commit(&commit, &inodes, &sizes, &content_cache, &did, &pds, &notifier).await { 114 - eprintln!("Error handling commit: {:?}", e); 115 - } 116 - } 117 - } 118 - } 119 - } 120 - } 121 - Some(Ok(_)) => {} // Ignore other message types 122 - Some(Err(e)) => { 123 - eprintln!("WebSocket error: {}", e); 124 - break; 125 - } 126 - None => { 127 - eprintln!("WebSocket closed"); 128 - break; 129 - } 130 - } 131 - } 132 - 133 - Ok(()) 134 - } 135 - 136 - /// Handle a commit by updating the inode tree and notifying Finder 137 - async fn handle_commit( 138 - commit: &Commit, 139 - inodes: &Arc<Mutex<IndexSet<PdsFsEntry>>>, 140 - sizes: &Arc<Mutex<IndexMap<usize, u64>>>, 141 - content_cache: &Arc<Mutex<IndexMap<String, String>>>, 142 - did: &str, 143 - pds: &str, 144 - notifier: &fuser::Notifier, 145 - ) -> Result<()> { 146 - // Find the DID inode 147 - let did_entry = PdsFsEntry::Did(did.to_string()); 148 - let did_inode = { 149 - let inodes_lock = inodes.lock().unwrap(); 150 - inodes_lock.get_index_of(&did_entry) 151 - }; 152 - 153 - let Some(did_inode) = did_inode else { 154 - return Err(anyhow!("DID not found in inodes")); 155 - }; 156 - 157 - for op in &commit.ops { 158 - let Some((collection, rkey)) = op.path.split_once('/') else { 159 - continue; 160 - }; 161 - 162 - match op.action.as_str() { 163 - "create" => { 164 - // Fetch the record from PDS 165 - let record_key = format!("{}/{}", collection, rkey); 166 - let cache_key = format!("{}/{}", did, record_key); 167 - 168 - // Fetch record content from PDS 169 - match fetch_record(pds, did, collection, rkey).await { 170 - Ok(content) => { 171 - let content_len = content.len() as u64; 172 - 173 - // Add the record to inodes 174 - let (collection_inode, record_inode) = { 175 - let mut inodes_lock = inodes.lock().unwrap(); 176 - 177 - // Ensure collection exists 178 - let collection_entry = PdsFsEntry::Collection(PdsFsCollection { 179 - parent: did_inode, 180 - nsid: collection.to_string(), 181 - }); 182 - let (collection_inode, _) = inodes_lock.insert_full(collection_entry); 183 - 184 - // Add the record 185 - let record_entry = PdsFsEntry::Record(PdsFsRecord { 186 - parent: collection_inode, 187 - rkey: rkey.to_string(), 188 - }); 189 - let (record_inode, _) = inodes_lock.insert_full(record_entry); 190 - (collection_inode, record_inode) 191 - }; 192 - 193 - // Cache the content and size 194 - content_cache.lock().unwrap().insert(cache_key, content); 195 - sizes.lock().unwrap().insert(record_inode, content_len); 196 - 197 - // Notify Finder about the new file (release lock first) 198 - let filename = format!("{}.json", rkey); 199 - if let Err(e) = notifier.inval_entry(collection_inode as u64, filename.as_ref()) { 200 - eprintln!("Failed to invalidate entry for {}: {}", filename, e); 201 - } 202 - 203 - println!("Created: {}/{}", collection, rkey); 204 - } 205 - Err(e) => { 206 - eprintln!("Failed to fetch record {}/{}: {}", collection, rkey, e); 207 - } 208 - } 209 - } 210 - "delete" => { 211 - // Get inodes before removing 212 - let (collection_inode_opt, child_inode_opt) = { 213 - let mut inodes_lock = inodes.lock().unwrap(); 214 - 215 - // Find the collection 216 - let collection_entry = PdsFsEntry::Collection(PdsFsCollection { 217 - parent: did_inode, 218 - nsid: collection.to_string(), 219 - }); 220 - let collection_inode = inodes_lock.get_index_of(&collection_entry); 221 - 222 - // Find and remove the record 223 - let child_inode = if let Some(coll_ino) = collection_inode { 224 - let record_entry = PdsFsEntry::Record(PdsFsRecord { 225 - parent: coll_ino, 226 - rkey: rkey.to_string(), 227 - }); 228 - let child_ino = inodes_lock.get_index_of(&record_entry); 229 - inodes_lock.shift_remove(&record_entry); 230 - child_ino 231 - } else { 232 - None 233 - }; 234 - 235 - (collection_inode, child_inode) 236 - }; 237 - 238 - // Notify Finder about the deletion (release lock first) 239 - if let (Some(coll_ino), Some(child_ino)) = (collection_inode_opt, child_inode_opt) { 240 - // Remove from caches 241 - sizes.lock().unwrap().shift_remove(&child_ino); 242 - let cache_key = format!("{}/{}/{}", did, collection, rkey); 243 - content_cache.lock().unwrap().shift_remove(&cache_key); 244 - 245 - let filename = format!("{}.json", rkey); 246 - if let Err(e) = notifier.delete(coll_ino as u64, child_ino as u64, filename.as_ref()) { 247 - eprintln!("Failed to notify deletion for {}: {}", filename, e); 248 - } 249 - } 250 - 251 - println!("Deleted: {}/{}", collection, rkey); 252 - } 253 - "update" => { 254 - // For updates, invalidate the inode so content is re-fetched 255 - let record_inode_opt = { 256 - let inodes_lock = inodes.lock().unwrap(); 257 - let collection_entry = PdsFsEntry::Collection(PdsFsCollection { 258 - parent: did_inode, 259 - nsid: collection.to_string(), 260 - }); 261 - 262 - if let Some(collection_inode) = inodes_lock.get_index_of(&collection_entry) { 263 - let record_entry = PdsFsEntry::Record(PdsFsRecord { 264 - parent: collection_inode, 265 - rkey: rkey.to_string(), 266 - }); 267 - inodes_lock.get_index_of(&record_entry) 268 - } else { 269 - None 270 - } 271 - }; 272 - 273 - // Notify Finder to invalidate the inode (release lock first) 274 - if let Some(record_ino) = record_inode_opt { 275 - // Clear caches so content is recalculated 276 - sizes.lock().unwrap().shift_remove(&record_ino); 277 - let cache_key = format!("{}/{}/{}", did, collection, rkey); 278 - content_cache.lock().unwrap().shift_remove(&cache_key); 279 - 280 - // Invalidate the entire inode (metadata and all data) 281 - if let Err(e) = notifier.inval_inode(record_ino as u64, 0, 0) { 282 - eprintln!("Failed to invalidate inode for {}/{}: {}", collection, rkey, e); 283 - } 284 - } 285 - 286 - println!("Updated: {}/{}", collection, rkey); 287 - } 288 - _ => {} 289 - } 290 - } 291 - 292 - Ok(()) 293 - } 294 - 295 - /// Fetch a record from the PDS 296 - async fn fetch_record(pds: &str, did: &str, collection: &str, rkey: &str) -> Result<String> { 297 - let client = AtpServiceClient::new(IsahcClient::new(pds)); 298 - let did = types::string::Did::new(did.to_string()).map_err(|e| anyhow!(e))?; 299 - let collection_nsid = types::string::Nsid::new(collection.to_string()).map_err(|e| anyhow!(e))?; 300 - let record_key = types::string::RecordKey::new(rkey.to_string()).map_err(|e| anyhow!(e))?; 301 - 302 - let response = client 303 - .service 304 - .com 305 - .atproto 306 - .repo 307 - .get_record(com::atproto::repo::get_record::Parameters::from( 308 - com::atproto::repo::get_record::ParametersData { 309 - cid: None, 310 - collection: collection_nsid, 311 - repo: types::string::AtIdentifier::Did(did), 312 - rkey: record_key, 313 - } 314 - )) 315 - .await?; 316 - 317 - Ok(serde_json::to_string_pretty(&response.value)?) 318 - }
+1 -1
src/fs.rs
··· 473 473 let mut repos = self.repos.lock().unwrap(); 474 474 let repo = &mut repos[&did]; 475 475 if let Ok(Some(val)) = self.rt.block_on(repo.get_raw::<ipld_core::ipld::Ipld>(&key)) { 476 - reply.data(&serde_json::to_string(&val).unwrap().as_bytes()[offset as usize..]); 476 + reply.data(&serde_json::to_string_pretty(&val).unwrap().as_bytes()[offset as usize..]); 477 477 return; 478 478 } 479 479 } else {

Submissions

sign up or login to add to the discussion
danabra.mov submitted #1
1 commit
expand
pretty print json
danabra.mov

fml this is too hard

danabra.mov

let me just resubmit everything as a stack

closed without merging
danabra.mov submitted #0
4 commits
expand
show files as .json
initialize tokio once
disable cache
watch firehose
danabra.mov

i guess "firehose" is inaccurate, the code just watches the pds. but whatever