mount an atproto PDS repository as a FUSE filesystem oppi.li/posts/mounting_the_atmosphere/

Watch live changes from the firehose #11

closed opened by danabra.mov targeting main from danabra.mov/pdsfs: firehose
Labels

None yet.

Participants 1
AT URI
at://did:plc:fpruhuo22xkm5o7ttr2ktxdo/sh.tangled.repo.pull/3m34e5c724p22
+505 -79
Diff #0
+82 -70
src/main.rs
··· 1 2 3 ··· 5 6 7 8 - 9 - 10 - 11 - 12 - 13 - 14 - 15 - 16 - 17 path::PathBuf, 18 sync::Arc, 19 }; 20 - use xdg::BaseDirectories; 21 22 fn main() { 23 let rt = tokio::runtime::Runtime::new().unwrap(); ··· 56 57 58 59 60 61 62 63 64 65 66 67 68 - 69 - 70 - 71 - 72 - 73 - 74 - 75 - 76 - 77 - 78 - 79 - 80 - 81 - 82 - 83 - 84 - 85 - 86 - 87 - 88 - 89 - 90 - 91 - 92 - 93 - 94 - 95 - 96 - 97 - 98 - 99 - 100 - 101 - 102 - 103 - 104 - 105 106 107 ··· 117 pb.enable_steady_tick(std::time::Duration::from_millis(100)); 118 pb = m.add(pb); 119 120 - let dirs = BaseDirectories::new(); 121 - 122 - let dir = dirs 123 - .get_cache_home() 124 - .expect("$HOME is absent") 125 - .join("pdsfs"); 126 - tokio::fs::create_dir_all(&dir).await?; 127 - 128 - let file = dir.join(&id.did); 129 - let exists = std::fs::exists(&file)?; 130 - 131 - let bytes = if !exists { 132 - pb.set_message(format!("downloading CAR file for...{}", id.did)); 133 - download_car_file(id, &pb).await? 134 - } else { 135 - pb.set_message(format!("using cached CAR file for...{}", id.did)); 136 - tokio::fs::read(&file).await? 137 - }; 138 - 139 - // write to disk 140 - if !exists { 141 - tokio::fs::write(&file, &bytes).await?; 142 - } 143 144 pb.finish(); 145 Ok(bytes)
··· 1 + mod client; 2 + mod error; 3 + mod firehose; 4 + mod fs; 5 + mod resolver; 6 7 8 ··· 10 11 12 13 + use futures::{StreamExt, stream}; 14 + use indicatif::{MultiProgress, ProgressBar, ProgressStyle}; 15 + use std::{ 16 + io::{Cursor, Write}, 17 path::PathBuf, 18 sync::Arc, 19 }; 20 21 fn main() { 22 let rt = tokio::runtime::Runtime::new().unwrap(); ··· 55 56 57 58 + let id = r.resolve(&h).await?; 59 + let bytes = cached_download(&id, &b).await?; 60 + let repo = build_repo(bytes).await?; 61 + Ok::<_, error::Error>((id.did.clone(), id.pds.clone(), repo)) 62 + } 63 + }) 64 + .collect::<Vec<_>>(), 65 66 67 + for e in errors { 68 + eprintln!("{:?}", e.as_ref().unwrap_err()); 69 + } 70 + let repos_with_pds: Vec<_> = success 71 + .into_iter() 72 + .map(|s| s.unwrap()) 73 + .collect(); 74 + 75 + // construct the fs 76 + let mut fs = fs::PdsFs::new(); 77 + 78 + // Extract (did, pds) pairs for WebSocket tasks before consuming repos 79 + let did_pds_pairs: Vec<_> = repos_with_pds.iter() 80 + .map(|(did, pds, _)| (did.clone(), pds.clone())) 81 + .collect(); 82 + 83 + // Consume repos_with_pds to add repos to filesystem 84 + for (did, _, repo) in repos_with_pds { 85 + rt.block_on(fs.add(did, repo)) 86 + } 87 88 + // get shared state for WebSocket tasks 89 + let (_repos_arc, inodes_arc, sizes_arc, content_cache_arc) = fs.get_shared_state(); 90 91 + // mount 92 + let options = vec![ 93 + MountOption::RO, 94 + 95 + 96 + MountOption::CUSTOM("local".to_string()), 97 + MountOption::CUSTOM("volname=pdsfs".to_string()), 98 + ]; 99 + 100 + // Create session and get notifier for Finder refresh 101 + let session = fuser::Session::new(fs, &mountpoint, &options).unwrap(); 102 + let notifier = session.notifier(); 103 + let _bg = session.spawn().unwrap(); 104 + 105 + // spawn WebSocket subscription tasks for each DID using the runtime handle 106 + let rt_handle = rt.handle().clone(); 107 + for (did, pds) in did_pds_pairs { 108 + let inodes_clone = Arc::clone(&inodes_arc); 109 + let sizes_clone = Arc::clone(&sizes_arc); 110 + let content_cache_clone = Arc::clone(&content_cache_arc); 111 + let notifier_clone = notifier.clone(); 112 + 113 + rt_handle.spawn(async move { 114 + if let Err(e) = firehose::subscribe_to_repo::<atrium_repo::blockstore::CarStore<std::io::Cursor<Vec<u8>>>>( 115 + did, 116 + pds, 117 + inodes_clone, 118 + sizes_clone, 119 + content_cache_clone, 120 + notifier_clone, 121 + ).await { 122 + eprintln!("WebSocket error: {:?}", e); 123 + } 124 + }); 125 + } 126 127 + println!("mounted at {mountpoint:?}"); 128 + print!("hit enter to unmount and exit..."); 129 130 131 132 + let mut input = String::new(); 133 + std::io::stdin().read_line(&mut input).unwrap(); 134 135 + println!("unmounted {mountpoint:?}"); 136 + } 137 138 139 ··· 149 pb.enable_steady_tick(std::time::Duration::from_millis(100)); 150 pb = m.add(pb); 151 152 + // Always download fresh - no caching for now to ensure up-to-date data 153 + pb.set_message(format!("downloading CAR file for...{}", id.did)); 154 + let bytes = download_car_file(id, &pb).await?; 155 156 pb.finish(); 157 Ok(bytes)
+101 -7
Cargo.lock
··· 97 "windows-sys 0.59.0", 98 ] 99 100 [[package]] 101 name = "async-channel" 102 version = "1.9.0" ··· 328 source = "registry+https://github.com/rust-lang/crates.io-index" 329 checksum = "46c5e41b57b8bba42a04676d81cb89e9ee8e859a1a66f80a5a72e1cb76b34d43" 330 331 [[package]] 332 name = "bytes" 333 version = "1.10.1" ··· 669 checksum = "778e2ac28f6c47af28e4907f13ffd1e1ddbd400980a9abd7c8df189bf578a5ad" 670 dependencies = [ 671 "libc", 672 - "windows-sys 0.59.0", 673 ] 674 675 [[package]] ··· 991 "idna", 992 "ipnet", 993 "once_cell", 994 - "rand", 995 "ring", 996 "thiserror 2.0.12", 997 "tinyvec", ··· 1013 "moka", 1014 "once_cell", 1015 "parking_lot", 1016 - "rand", 1017 "resolv-conf", 1018 "smallvec", 1019 "thiserror 2.0.12", ··· 1757 name = "pdsfs" 1758 version = "0.1.0" 1759 dependencies = [ 1760 "atrium-api", 1761 "atrium-common", 1762 "atrium-identity", ··· 1776 "serde_json", 1777 "thiserror 2.0.12", 1778 "tokio", 1779 "xdg", 1780 ] 1781 ··· 1887 source = "registry+https://github.com/rust-lang/crates.io-index" 1888 checksum = "69cdb34c158ceb288df11e18b4bd39de994f6657d83847bdffdbd7f346754b0f" 1889 1890 [[package]] 1891 name = "rand" 1892 version = "0.9.2" 1893 source = "registry+https://github.com/rust-lang/crates.io-index" 1894 checksum = "6db2770f06117d490610c7488547d543617b21bfa07796d7a12f6f1bd53850d1" 1895 dependencies = [ 1896 - "rand_chacha", 1897 - "rand_core", 1898 ] 1899 1900 [[package]] ··· 1904 checksum = "d3022b5f1df60f26e1ffddd6c66e8aa15de382ae63b3a0c1bfc0e4d3e3f325cb" 1905 dependencies = [ 1906 "ppv-lite86", 1907 - "rand_core", 1908 ] 1909 1910 [[package]] ··· 2057 "errno", 2058 "libc", 2059 "linux-raw-sys", 2060 - "windows-sys 0.59.0", 2061 ] 2062 2063 [[package]] ··· 2233 "serde", 2234 ] 2235 2236 [[package]] 2237 name = "sha2" 2238 version = "0.10.9" ··· 2514 "tokio", 2515 ] 2516 2517 [[package]] 2518 name = "tokio-util" 2519 version = "0.7.15" ··· 2662 source = "registry+https://github.com/rust-lang/crates.io-index" 2663 checksum = "e421abadd41a4225275504ea4d6566923418b7f05506fbc9c0fe86ba7396114b" 2664 2665 [[package]] 2666 name = "typenum" 2667 version = "1.18.0" ··· 2713 "percent-encoding", 2714 ] 2715 2716 [[package]] 2717 name = "utf8_iter" 2718 version = "1.0.4"
··· 97 "windows-sys 0.59.0", 98 ] 99 100 + [[package]] 101 + name = "anyhow" 102 + version = "1.0.100" 103 + source = "registry+https://github.com/rust-lang/crates.io-index" 104 + checksum = "a23eb6b1614318a8071c9b2521f36b424b2c83db5eb3a0fead4a6c0809af6e61" 105 + 106 [[package]] 107 name = "async-channel" 108 version = "1.9.0" ··· 334 source = "registry+https://github.com/rust-lang/crates.io-index" 335 checksum = "46c5e41b57b8bba42a04676d81cb89e9ee8e859a1a66f80a5a72e1cb76b34d43" 336 337 + [[package]] 338 + name = "byteorder" 339 + version = "1.5.0" 340 + source = "registry+https://github.com/rust-lang/crates.io-index" 341 + checksum = "1fd0f2584146f6f2ef48085050886acf353beff7305ebd1ae69500e27c67f64b" 342 + 343 [[package]] 344 name = "bytes" 345 version = "1.10.1" ··· 681 checksum = "778e2ac28f6c47af28e4907f13ffd1e1ddbd400980a9abd7c8df189bf578a5ad" 682 dependencies = [ 683 "libc", 684 + "windows-sys 0.60.2", 685 ] 686 687 [[package]] ··· 1003 "idna", 1004 "ipnet", 1005 "once_cell", 1006 + "rand 0.9.2", 1007 "ring", 1008 "thiserror 2.0.12", 1009 "tinyvec", ··· 1025 "moka", 1026 "once_cell", 1027 "parking_lot", 1028 + "rand 0.9.2", 1029 "resolv-conf", 1030 "smallvec", 1031 "thiserror 2.0.12", ··· 1769 name = "pdsfs" 1770 version = "0.1.0" 1771 dependencies = [ 1772 + "anyhow", 1773 "atrium-api", 1774 "atrium-common", 1775 "atrium-identity", ··· 1789 "serde_json", 1790 "thiserror 2.0.12", 1791 "tokio", 1792 + "tokio-tungstenite", 1793 "xdg", 1794 ] 1795 ··· 1901 source = "registry+https://github.com/rust-lang/crates.io-index" 1902 checksum = "69cdb34c158ceb288df11e18b4bd39de994f6657d83847bdffdbd7f346754b0f" 1903 1904 + [[package]] 1905 + name = "rand" 1906 + version = "0.8.5" 1907 + source = "registry+https://github.com/rust-lang/crates.io-index" 1908 + checksum = "34af8d1a0e25924bc5b7c43c079c942339d8f0a8b57c39049bef581b46327404" 1909 + dependencies = [ 1910 + "libc", 1911 + "rand_chacha 0.3.1", 1912 + "rand_core 0.6.4", 1913 + ] 1914 + 1915 [[package]] 1916 name = "rand" 1917 version = "0.9.2" 1918 source = "registry+https://github.com/rust-lang/crates.io-index" 1919 checksum = "6db2770f06117d490610c7488547d543617b21bfa07796d7a12f6f1bd53850d1" 1920 dependencies = [ 1921 + "rand_chacha 0.9.0", 1922 + "rand_core 0.9.3", 1923 + ] 1924 + 1925 + [[package]] 1926 + name = "rand_chacha" 1927 + version = "0.3.1" 1928 + source = "registry+https://github.com/rust-lang/crates.io-index" 1929 + checksum = "e6c10a63a0fa32252be49d21e7709d4d4baf8d231c2dbce1eaa8141b9b127d88" 1930 + dependencies = [ 1931 + "ppv-lite86", 1932 + "rand_core 0.6.4", 1933 ] 1934 1935 [[package]] ··· 1939 checksum = "d3022b5f1df60f26e1ffddd6c66e8aa15de382ae63b3a0c1bfc0e4d3e3f325cb" 1940 dependencies = [ 1941 "ppv-lite86", 1942 + "rand_core 0.9.3", 1943 + ] 1944 + 1945 + [[package]] 1946 + name = "rand_core" 1947 + version = "0.6.4" 1948 + source = "registry+https://github.com/rust-lang/crates.io-index" 1949 + checksum = "ec0be4795e2f6a28069bec0b5ff3e2ac9bafc99e6a9a7dc3547996c5c816922c" 1950 + dependencies = [ 1951 + "getrandom 0.2.16", 1952 ] 1953 1954 [[package]] ··· 2101 "errno", 2102 "libc", 2103 "linux-raw-sys", 2104 + "windows-sys 0.60.2", 2105 ] 2106 2107 [[package]] ··· 2277 "serde", 2278 ] 2279 2280 + [[package]] 2281 + name = "sha1" 2282 + version = "0.10.6" 2283 + source = "registry+https://github.com/rust-lang/crates.io-index" 2284 + checksum = "e3bf829a2d51ab4a5ddf1352d8470c140cadc8301b2ae1789db023f01cedd6ba" 2285 + dependencies = [ 2286 + "cfg-if", 2287 + "cpufeatures", 2288 + "digest", 2289 + ] 2290 + 2291 [[package]] 2292 name = "sha2" 2293 version = "0.10.9" ··· 2569 "tokio", 2570 ] 2571 2572 + [[package]] 2573 + name = "tokio-tungstenite" 2574 + version = "0.24.0" 2575 + source = "registry+https://github.com/rust-lang/crates.io-index" 2576 + checksum = "edc5f74e248dc973e0dbb7b74c7e0d6fcc301c694ff50049504004ef4d0cdcd9" 2577 + dependencies = [ 2578 + "futures-util", 2579 + "log", 2580 + "native-tls", 2581 + "tokio", 2582 + "tokio-native-tls", 2583 + "tungstenite", 2584 + ] 2585 + 2586 [[package]] 2587 name = "tokio-util" 2588 version = "0.7.15" ··· 2731 source = "registry+https://github.com/rust-lang/crates.io-index" 2732 checksum = "e421abadd41a4225275504ea4d6566923418b7f05506fbc9c0fe86ba7396114b" 2733 2734 + [[package]] 2735 + name = "tungstenite" 2736 + version = "0.24.0" 2737 + source = "registry+https://github.com/rust-lang/crates.io-index" 2738 + checksum = "18e5b8366ee7a95b16d32197d0b2604b43a0be89dc5fac9f8e96ccafbaedda8a" 2739 + dependencies = [ 2740 + "byteorder", 2741 + "bytes", 2742 + "data-encoding", 2743 + "http 1.3.1", 2744 + "httparse", 2745 + "log", 2746 + "native-tls", 2747 + "rand 0.8.5", 2748 + "sha1", 2749 + "thiserror 1.0.69", 2750 + "utf-8", 2751 + ] 2752 + 2753 [[package]] 2754 name = "typenum" 2755 version = "1.18.0" ··· 2801 "percent-encoding", 2802 ] 2803 2804 + [[package]] 2805 + name = "utf-8" 2806 + version = "0.7.6" 2807 + source = "registry+https://github.com/rust-lang/crates.io-index" 2808 + checksum = "09cc8ee72d2a9becf2f2febe0205bbed8fc6615b7cb429ad062dc7b7ddd036a9" 2809 + 2810 [[package]] 2811 name = "utf8_iter" 2812 version = "1.0.4"
+4 -2
Cargo.toml
··· 4 edition = "2024" 5 6 [dependencies] 7 atrium-api = "0.25.4" 8 atrium-common = "0.1.2" 9 atrium-identity = "0.1.5" ··· 11 atrium-xrpc = "0.12.3" 12 atrium-xrpc-client = { version = "0.5.14", features=["isahc"] } 13 clap = { version = "4.5.41", features = ["cargo"] } 14 - fuser = "0.15.1" 15 futures = "0.3.31" 16 hickory-resolver = "0.25.2" 17 indexmap = "2.10.0" ··· 22 serde_ipld_dagcbor = "0.6.3" 23 serde_json = "1.0.141" 24 thiserror = "2.0.12" 25 - tokio = { version = "1.46.1", features = ["fs"] } 26 xdg = "3.0.0"
··· 4 edition = "2024" 5 6 [dependencies] 7 + anyhow = "1.0" 8 atrium-api = "0.25.4" 9 atrium-common = "0.1.2" 10 atrium-identity = "0.1.5" ··· 12 atrium-xrpc = "0.12.3" 13 atrium-xrpc-client = { version = "0.5.14", features=["isahc"] } 14 clap = { version = "4.5.41", features = ["cargo"] } 15 + fuser = { version = "0.15.1", features = ["abi-7-18"] } 16 futures = "0.3.31" 17 hickory-resolver = "0.25.2" 18 indexmap = "2.10.0" ··· 23 serde_ipld_dagcbor = "0.6.3" 24 serde_json = "1.0.141" 25 thiserror = "2.0.12" 26 + tokio = { version = "1.46.1", features = ["fs", "sync", "rt-multi-thread"] } 27 + tokio-tungstenite = { version = "0.24", features = ["native-tls"] } 28 xdg = "3.0.0"
+318
src/firehose.rs
···
··· 1 + use anyhow::{anyhow, Result}; 2 + use atrium_api::com::atproto::sync::subscribe_repos::{Commit, NSID}; 3 + use atrium_api::client::AtpServiceClient; 4 + use atrium_api::com; 5 + use atrium_api::types; 6 + use atrium_xrpc_client::isahc::IsahcClient; 7 + use futures::StreamExt; 8 + use ipld_core::ipld::Ipld; 9 + use std::io::Cursor; 10 + use std::sync::{Arc, Mutex}; 11 + use tokio_tungstenite::connect_async; 12 + use tokio_tungstenite::tungstenite::Message; 13 + 14 + use crate::fs::{PdsFsCollection, PdsFsEntry, PdsFsRecord}; 15 + use indexmap::{IndexMap, IndexSet}; 16 + 17 + /// Frame header types for WebSocket messages 18 + #[derive(Debug, Clone, PartialEq, Eq)] 19 + enum FrameHeader { 20 + Message(Option<String>), 21 + Error, 22 + } 23 + 24 + impl TryFrom<Ipld> for FrameHeader { 25 + type Error = anyhow::Error; 26 + 27 + fn try_from(value: Ipld) -> Result<Self> { 28 + if let Ipld::Map(map) = value { 29 + if let Some(Ipld::Integer(i)) = map.get("op") { 30 + match i { 31 + 1 => { 32 + let t = if let Some(Ipld::String(s)) = map.get("t") { 33 + Some(s.clone()) 34 + } else { 35 + None 36 + }; 37 + return Ok(FrameHeader::Message(t)); 38 + } 39 + -1 => return Ok(FrameHeader::Error), 40 + _ => {} 41 + } 42 + } 43 + } 44 + Err(anyhow!("invalid frame type")) 45 + } 46 + } 47 + 48 + /// Frame types for parsed WebSocket messages 49 + #[derive(Debug, Clone, PartialEq, Eq)] 50 + pub enum Frame { 51 + Message(Option<String>, MessageFrame), 52 + Error(ErrorFrame), 53 + } 54 + 55 + #[derive(Debug, Clone, PartialEq, Eq)] 56 + pub struct MessageFrame { 57 + pub body: Vec<u8>, 58 + } 59 + 60 + #[derive(Debug, Clone, PartialEq, Eq)] 61 + pub struct ErrorFrame {} 62 + 63 + impl TryFrom<&[u8]> for Frame { 64 + type Error = anyhow::Error; 65 + 66 + fn try_from(value: &[u8]) -> Result<Self> { 67 + let mut cursor = Cursor::new(value); 68 + let (left, right) = match serde_ipld_dagcbor::from_reader::<Ipld, _>(&mut cursor) { 69 + Err(serde_ipld_dagcbor::DecodeError::TrailingData) => { 70 + value.split_at(cursor.position() as usize) 71 + } 72 + _ => { 73 + return Err(anyhow!("invalid frame type")); 74 + } 75 + }; 76 + let header = FrameHeader::try_from(serde_ipld_dagcbor::from_slice::<Ipld>(left)?)?; 77 + if let FrameHeader::Message(t) = &header { 78 + Ok(Frame::Message(t.clone(), MessageFrame { body: right.to_vec() })) 79 + } else { 80 + Ok(Frame::Error(ErrorFrame {})) 81 + } 82 + } 83 + } 84 + 85 + /// Subscribe to a repo's firehose and update inodes on changes 86 + pub async fn subscribe_to_repo<R>( 87 + did: String, 88 + pds: String, 89 + inodes: Arc<Mutex<IndexSet<PdsFsEntry>>>, 90 + sizes: Arc<Mutex<IndexMap<usize, u64>>>, 91 + content_cache: Arc<Mutex<IndexMap<String, String>>>, 92 + notifier: fuser::Notifier, 93 + ) -> Result<()> 94 + where 95 + R: atrium_repo::blockstore::AsyncBlockStoreRead, 96 + { 97 + // Strip https:// or http:// prefix from PDS URL if present 98 + let pds_host = pds.trim_start_matches("https://").trim_start_matches("http://"); 99 + let url = format!("wss://{}/xrpc/{}", pds_host, NSID); 100 + println!("Connecting to firehose: {}", url); 101 + 102 + let (mut stream, _) = connect_async(url).await?; 103 + println!("Connected to firehose for {}", did); 104 + 105 + loop { 106 + match stream.next().await { 107 + Some(Ok(Message::Binary(data))) => { 108 + if let Ok(Frame::Message(Some(t), msg)) = Frame::try_from(data.as_slice()) { 109 + if t.as_str() == "#commit" { 110 + if let Ok(commit) = serde_ipld_dagcbor::from_reader::<Commit, _>(msg.body.as_slice()) { 111 + // Only process commits for our DID 112 + if commit.repo.as_str() == did { 113 + if let Err(e) = handle_commit(&commit, &inodes, &sizes, &content_cache, &did, &pds, &notifier).await { 114 + eprintln!("Error handling commit: {:?}", e); 115 + } 116 + } 117 + } 118 + } 119 + } 120 + } 121 + Some(Ok(_)) => {} // Ignore other message types 122 + Some(Err(e)) => { 123 + eprintln!("WebSocket error: {}", e); 124 + break; 125 + } 126 + None => { 127 + eprintln!("WebSocket closed"); 128 + break; 129 + } 130 + } 131 + } 132 + 133 + Ok(()) 134 + } 135 + 136 + /// Handle a commit by updating the inode tree and notifying Finder 137 + async fn handle_commit( 138 + commit: &Commit, 139 + inodes: &Arc<Mutex<IndexSet<PdsFsEntry>>>, 140 + sizes: &Arc<Mutex<IndexMap<usize, u64>>>, 141 + content_cache: &Arc<Mutex<IndexMap<String, String>>>, 142 + did: &str, 143 + pds: &str, 144 + notifier: &fuser::Notifier, 145 + ) -> Result<()> { 146 + // Find the DID inode 147 + let did_entry = PdsFsEntry::Did(did.to_string()); 148 + let did_inode = { 149 + let inodes_lock = inodes.lock().unwrap(); 150 + inodes_lock.get_index_of(&did_entry) 151 + }; 152 + 153 + let Some(did_inode) = did_inode else { 154 + return Err(anyhow!("DID not found in inodes")); 155 + }; 156 + 157 + for op in &commit.ops { 158 + let Some((collection, rkey)) = op.path.split_once('/') else { 159 + continue; 160 + }; 161 + 162 + match op.action.as_str() { 163 + "create" => { 164 + // Fetch the record from PDS 165 + let record_key = format!("{}/{}", collection, rkey); 166 + let cache_key = format!("{}/{}", did, record_key); 167 + 168 + // Fetch record content from PDS 169 + match fetch_record(pds, did, collection, rkey).await { 170 + Ok(content) => { 171 + let content_len = content.len() as u64; 172 + 173 + // Add the record to inodes 174 + let (collection_inode, record_inode) = { 175 + let mut inodes_lock = inodes.lock().unwrap(); 176 + 177 + // Ensure collection exists 178 + let collection_entry = PdsFsEntry::Collection(PdsFsCollection { 179 + parent: did_inode, 180 + nsid: collection.to_string(), 181 + }); 182 + let (collection_inode, _) = inodes_lock.insert_full(collection_entry); 183 + 184 + // Add the record 185 + let record_entry = PdsFsEntry::Record(PdsFsRecord { 186 + parent: collection_inode, 187 + rkey: rkey.to_string(), 188 + }); 189 + let (record_inode, _) = inodes_lock.insert_full(record_entry); 190 + (collection_inode, record_inode) 191 + }; 192 + 193 + // Cache the content and size 194 + content_cache.lock().unwrap().insert(cache_key, content); 195 + sizes.lock().unwrap().insert(record_inode, content_len); 196 + 197 + // Notify Finder about the new file (release lock first) 198 + let filename = format!("{}.json", rkey); 199 + if let Err(e) = notifier.inval_entry(collection_inode as u64, filename.as_ref()) { 200 + eprintln!("Failed to invalidate entry for {}: {}", filename, e); 201 + } 202 + 203 + println!("Created: {}/{}", collection, rkey); 204 + } 205 + Err(e) => { 206 + eprintln!("Failed to fetch record {}/{}: {}", collection, rkey, e); 207 + } 208 + } 209 + } 210 + "delete" => { 211 + // Get inodes before removing 212 + let (collection_inode_opt, child_inode_opt) = { 213 + let mut inodes_lock = inodes.lock().unwrap(); 214 + 215 + // Find the collection 216 + let collection_entry = PdsFsEntry::Collection(PdsFsCollection { 217 + parent: did_inode, 218 + nsid: collection.to_string(), 219 + }); 220 + let collection_inode = inodes_lock.get_index_of(&collection_entry); 221 + 222 + // Find and remove the record 223 + let child_inode = if let Some(coll_ino) = collection_inode { 224 + let record_entry = PdsFsEntry::Record(PdsFsRecord { 225 + parent: coll_ino, 226 + rkey: rkey.to_string(), 227 + }); 228 + let child_ino = inodes_lock.get_index_of(&record_entry); 229 + inodes_lock.shift_remove(&record_entry); 230 + child_ino 231 + } else { 232 + None 233 + }; 234 + 235 + (collection_inode, child_inode) 236 + }; 237 + 238 + // Notify Finder about the deletion (release lock first) 239 + if let (Some(coll_ino), Some(child_ino)) = (collection_inode_opt, child_inode_opt) { 240 + // Remove from caches 241 + sizes.lock().unwrap().shift_remove(&child_ino); 242 + let cache_key = format!("{}/{}/{}", did, collection, rkey); 243 + content_cache.lock().unwrap().shift_remove(&cache_key); 244 + 245 + let filename = format!("{}.json", rkey); 246 + if let Err(e) = notifier.delete(coll_ino as u64, child_ino as u64, filename.as_ref()) { 247 + eprintln!("Failed to notify deletion for {}: {}", filename, e); 248 + } 249 + } 250 + 251 + println!("Deleted: {}/{}", collection, rkey); 252 + } 253 + "update" => { 254 + // For updates, invalidate the inode so content is re-fetched 255 + let record_inode_opt = { 256 + let inodes_lock = inodes.lock().unwrap(); 257 + let collection_entry = PdsFsEntry::Collection(PdsFsCollection { 258 + parent: did_inode, 259 + nsid: collection.to_string(), 260 + }); 261 + 262 + if let Some(collection_inode) = inodes_lock.get_index_of(&collection_entry) { 263 + let record_entry = PdsFsEntry::Record(PdsFsRecord { 264 + parent: collection_inode, 265 + rkey: rkey.to_string(), 266 + }); 267 + inodes_lock.get_index_of(&record_entry) 268 + } else { 269 + None 270 + } 271 + }; 272 + 273 + // Notify Finder to invalidate the inode (release lock first) 274 + if let Some(record_ino) = record_inode_opt { 275 + // Clear caches so content is recalculated 276 + sizes.lock().unwrap().shift_remove(&record_ino); 277 + let cache_key = format!("{}/{}/{}", did, collection, rkey); 278 + content_cache.lock().unwrap().shift_remove(&cache_key); 279 + 280 + // Invalidate the entire inode (metadata and all data) 281 + if let Err(e) = notifier.inval_inode(record_ino as u64, 0, 0) { 282 + eprintln!("Failed to invalidate inode for {}/{}: {}", collection, rkey, e); 283 + } 284 + } 285 + 286 + println!("Updated: {}/{}", collection, rkey); 287 + } 288 + _ => {} 289 + } 290 + } 291 + 292 + Ok(()) 293 + } 294 + 295 + /// Fetch a record from the PDS 296 + async fn fetch_record(pds: &str, did: &str, collection: &str, rkey: &str) -> Result<String> { 297 + let client = AtpServiceClient::new(IsahcClient::new(pds)); 298 + let did = types::string::Did::new(did.to_string()).map_err(|e| anyhow!(e))?; 299 + let collection_nsid = types::string::Nsid::new(collection.to_string()).map_err(|e| anyhow!(e))?; 300 + let record_key = types::string::RecordKey::new(rkey.to_string()).map_err(|e| anyhow!(e))?; 301 + 302 + let response = client 303 + .service 304 + .com 305 + .atproto 306 + .repo 307 + .get_record(com::atproto::repo::get_record::Parameters::from( 308 + com::atproto::repo::get_record::ParametersData { 309 + cid: None, 310 + collection: collection_nsid, 311 + repo: types::string::AtIdentifier::Did(did), 312 + rkey: record_key, 313 + } 314 + )) 315 + .await?; 316 + 317 + Ok(serde_json::to_string_pretty(&response.value)?) 318 + }

Submissions

sign up or login to add to the discussion
danabra.mov submitted #1
1 commit
expand
pretty print json
danabra.mov

fml this is too hard

danabra.mov

let me just resubmit everything as a stack

closed without merging
danabra.mov submitted #0
4 commits
expand
show files as .json
initialize tokio once
disable cache
watch firehose
danabra.mov

i guess "firehose" is inaccurate, the code just watches the pds. but whatever