mount an atproto PDS repository as a FUSE filesystem

Compare changes

Choose any two refs to compare.

+5
.tangled/workflows/build.yml
··· 8 8 - cargo 9 9 - rustc 10 10 - rustfmt 11 + - pkg-config 12 + - fuse3 13 + - openssl 11 14 12 15 steps: 13 16 - name: build 14 17 command: | 18 + export PKG_CONFIG_PATH="${PKG_CONFIG_PATH:+$PKG_CONFIG_PATH:}$(nix build nixpkgs#openssl.dev --no-link --print-out-paths)/lib/pkgconfig" 19 + export PKG_CONFIG_PATH="${PKG_CONFIG_PATH}:$(nix build nixpkgs#fuse3.dev --no-link --print-out-paths)/lib/pkgconfig" 15 20 cargo build --all --verbose 16 21 17 22
+101 -7
Cargo.lock
··· 98 98 ] 99 99 100 100 [[package]] 101 + name = "anyhow" 102 + version = "1.0.100" 103 + source = "registry+https://github.com/rust-lang/crates.io-index" 104 + checksum = "a23eb6b1614318a8071c9b2521f36b424b2c83db5eb3a0fead4a6c0809af6e61" 105 + 106 + [[package]] 101 107 name = "async-channel" 102 108 version = "1.9.0" 103 109 source = "registry+https://github.com/rust-lang/crates.io-index" ··· 327 333 version = "3.19.0" 328 334 source = "registry+https://github.com/rust-lang/crates.io-index" 329 335 checksum = "46c5e41b57b8bba42a04676d81cb89e9ee8e859a1a66f80a5a72e1cb76b34d43" 336 + 337 + [[package]] 338 + name = "byteorder" 339 + version = "1.5.0" 340 + source = "registry+https://github.com/rust-lang/crates.io-index" 341 + checksum = "1fd0f2584146f6f2ef48085050886acf353beff7305ebd1ae69500e27c67f64b" 330 342 331 343 [[package]] 332 344 name = "bytes" ··· 669 681 checksum = "778e2ac28f6c47af28e4907f13ffd1e1ddbd400980a9abd7c8df189bf578a5ad" 670 682 dependencies = [ 671 683 "libc", 672 - "windows-sys 0.59.0", 684 + "windows-sys 0.60.2", 673 685 ] 674 686 675 687 [[package]] ··· 991 1003 "idna", 992 1004 "ipnet", 993 1005 "once_cell", 994 - "rand", 1006 + "rand 0.9.2", 995 1007 "ring", 996 1008 "thiserror 2.0.12", 997 1009 "tinyvec", ··· 1013 1025 "moka", 1014 1026 "once_cell", 1015 1027 "parking_lot", 1016 - "rand", 1028 + "rand 0.9.2", 1017 1029 "resolv-conf", 1018 1030 "smallvec", 1019 1031 "thiserror 2.0.12", ··· 1757 1769 name = "pdsfs" 1758 1770 version = "0.1.0" 1759 1771 dependencies = [ 1772 + "anyhow", 1760 1773 "atrium-api", 1761 1774 "atrium-common", 1762 1775 "atrium-identity", ··· 1776 1789 "serde_json", 1777 1790 "thiserror 2.0.12", 1778 1791 "tokio", 1792 + "tokio-tungstenite", 1779 1793 "xdg", 1780 1794 ] 1781 1795 ··· 1889 1903 1890 1904 [[package]] 1891 1905 name = "rand" 1906 + version = "0.8.5" 1907 + source = "registry+https://github.com/rust-lang/crates.io-index" 1908 + checksum = "34af8d1a0e25924bc5b7c43c079c942339d8f0a8b57c39049bef581b46327404" 1909 + dependencies = [ 1910 + "libc", 1911 + "rand_chacha 0.3.1", 1912 + "rand_core 0.6.4", 1913 + ] 1914 + 1915 + [[package]] 1916 + name = "rand" 1892 1917 version = "0.9.2" 1893 1918 source = "registry+https://github.com/rust-lang/crates.io-index" 1894 1919 checksum = "6db2770f06117d490610c7488547d543617b21bfa07796d7a12f6f1bd53850d1" 1895 1920 dependencies = [ 1896 - "rand_chacha", 1897 - "rand_core", 1921 + "rand_chacha 0.9.0", 1922 + "rand_core 0.9.3", 1923 + ] 1924 + 1925 + [[package]] 1926 + name = "rand_chacha" 1927 + version = "0.3.1" 1928 + source = "registry+https://github.com/rust-lang/crates.io-index" 1929 + checksum = "e6c10a63a0fa32252be49d21e7709d4d4baf8d231c2dbce1eaa8141b9b127d88" 1930 + dependencies = [ 1931 + "ppv-lite86", 1932 + "rand_core 0.6.4", 1898 1933 ] 1899 1934 1900 1935 [[package]] ··· 1904 1939 checksum = "d3022b5f1df60f26e1ffddd6c66e8aa15de382ae63b3a0c1bfc0e4d3e3f325cb" 1905 1940 dependencies = [ 1906 1941 "ppv-lite86", 1907 - "rand_core", 1942 + "rand_core 0.9.3", 1943 + ] 1944 + 1945 + [[package]] 1946 + name = "rand_core" 1947 + version = "0.6.4" 1948 + source = "registry+https://github.com/rust-lang/crates.io-index" 1949 + checksum = "ec0be4795e2f6a28069bec0b5ff3e2ac9bafc99e6a9a7dc3547996c5c816922c" 1950 + dependencies = [ 1951 + "getrandom 0.2.16", 1908 1952 ] 1909 1953 1910 1954 [[package]] ··· 2057 2101 "errno", 2058 2102 "libc", 2059 2103 "linux-raw-sys", 2060 - "windows-sys 0.59.0", 2104 + "windows-sys 0.60.2", 2061 2105 ] 2062 2106 2063 2107 [[package]] ··· 2234 2278 ] 2235 2279 2236 2280 [[package]] 2281 + name = "sha1" 2282 + version = "0.10.6" 2283 + source = "registry+https://github.com/rust-lang/crates.io-index" 2284 + checksum = "e3bf829a2d51ab4a5ddf1352d8470c140cadc8301b2ae1789db023f01cedd6ba" 2285 + dependencies = [ 2286 + "cfg-if", 2287 + "cpufeatures", 2288 + "digest", 2289 + ] 2290 + 2291 + [[package]] 2237 2292 name = "sha2" 2238 2293 version = "0.10.9" 2239 2294 source = "registry+https://github.com/rust-lang/crates.io-index" ··· 2515 2570 ] 2516 2571 2517 2572 [[package]] 2573 + name = "tokio-tungstenite" 2574 + version = "0.24.0" 2575 + source = "registry+https://github.com/rust-lang/crates.io-index" 2576 + checksum = "edc5f74e248dc973e0dbb7b74c7e0d6fcc301c694ff50049504004ef4d0cdcd9" 2577 + dependencies = [ 2578 + "futures-util", 2579 + "log", 2580 + "native-tls", 2581 + "tokio", 2582 + "tokio-native-tls", 2583 + "tungstenite", 2584 + ] 2585 + 2586 + [[package]] 2518 2587 name = "tokio-util" 2519 2588 version = "0.7.15" 2520 2589 source = "registry+https://github.com/rust-lang/crates.io-index" ··· 2663 2732 checksum = "e421abadd41a4225275504ea4d6566923418b7f05506fbc9c0fe86ba7396114b" 2664 2733 2665 2734 [[package]] 2735 + name = "tungstenite" 2736 + version = "0.24.0" 2737 + source = "registry+https://github.com/rust-lang/crates.io-index" 2738 + checksum = "18e5b8366ee7a95b16d32197d0b2604b43a0be89dc5fac9f8e96ccafbaedda8a" 2739 + dependencies = [ 2740 + "byteorder", 2741 + "bytes", 2742 + "data-encoding", 2743 + "http 1.3.1", 2744 + "httparse", 2745 + "log", 2746 + "native-tls", 2747 + "rand 0.8.5", 2748 + "sha1", 2749 + "thiserror 1.0.69", 2750 + "utf-8", 2751 + ] 2752 + 2753 + [[package]] 2666 2754 name = "typenum" 2667 2755 version = "1.18.0" 2668 2756 source = "registry+https://github.com/rust-lang/crates.io-index" ··· 2712 2800 "idna", 2713 2801 "percent-encoding", 2714 2802 ] 2803 + 2804 + [[package]] 2805 + name = "utf-8" 2806 + version = "0.7.6" 2807 + source = "registry+https://github.com/rust-lang/crates.io-index" 2808 + checksum = "09cc8ee72d2a9becf2f2febe0205bbed8fc6615b7cb429ad062dc7b7ddd036a9" 2715 2809 2716 2810 [[package]] 2717 2811 name = "utf8_iter"
+4 -2
Cargo.toml
··· 4 4 edition = "2024" 5 5 6 6 [dependencies] 7 + anyhow = "1.0" 7 8 atrium-api = "0.25.4" 8 9 atrium-common = "0.1.2" 9 10 atrium-identity = "0.1.5" ··· 11 12 atrium-xrpc = "0.12.3" 12 13 atrium-xrpc-client = { version = "0.5.14", features=["isahc"] } 13 14 clap = { version = "4.5.41", features = ["cargo"] } 14 - fuser = "0.15.1" 15 + fuser = { version = "0.15.1", features = ["abi-7-18"] } 15 16 futures = "0.3.31" 16 17 hickory-resolver = "0.25.2" 17 18 indexmap = "2.10.0" ··· 22 23 serde_ipld_dagcbor = "0.6.3" 23 24 serde_json = "1.0.141" 24 25 thiserror = "2.0.12" 25 - tokio = { version = "1.46.1", features = ["fs"] } 26 + tokio = { version = "1.46.1", features = ["fs", "sync", "rt-multi-thread"] } 27 + tokio-tungstenite = { version = "0.24", features = ["native-tls"] } 26 28 xdg = "3.0.0"
-3
allowed_signers
··· 1 - ssh-rsa AAAAB3NzaC1yc2EAAAADAQABAAABgQCivdIHm5jcKSMeVwdIIQj7bucOOBKnm2UD7TRRxOlo79UvdFDOg3W+hGYJfCVmDrqj7Ea1VMiytF/rbO45bcnsgpDce7A289NWBohsRMYXQ580/CHPepYhXTNL7mYOO/zTCcP6ABfK6d/iDl7J+6gx0NCAfyAL47fqqsIMEh4OvvghJvhhIMk/eQvEdXmqWXVqsX5d5heQCej8ii/1KsrVM/RYTcuIZuZ4nQyZEgIu+8A78dnKFos5Lg+tYUKKgjXyN3hbwpwgU4sSP9KgbrnyuVPBds2rulznzY0m/DRcJI0wyps5GoawX+VHpkP/jk/fhqblA9BRh8eO9fqSPMz7pOcQNdpZHpUoBqqKMQihkhq6/JVvgCRL7aka9umc9bcfh52or7fDSXTxNlFtrX8Cfw4Y5cjhCpGVXI8kbXgYGU9IHEB6DEEFwEKha6ND5Zr8JkXhhxpY4hFddQIg7/TxEmP0jpO7dixp0bGPjHauIJqe9iACK0ZCSNReVEdHPqc= steveklabnik@DESKTOP-VV370NK 2 - ssh-ed25519 AAAAC3NzaC1lZDI1NTE5AAAAICJPYX06+qKr9IHWfkgCtHbExoBOOwS/+iAWbog9bAdk icy@wyndle 3 - ssh-ed25519 AAAAC3NzaC1lZDI1NTE5AAAAICMmmaDFqSmbQLnPuTtg32wBdJs1xsituz3jrJBqlM1u anil@recoil.org
+96
flake.lock
··· 1 + { 2 + "nodes": { 3 + "flake-utils": { 4 + "inputs": { 5 + "systems": "systems" 6 + }, 7 + "locked": { 8 + "lastModified": 1731533236, 9 + "narHash": "sha256-l0KFg5HjrsfsO/JpG+r7fRrqm12kzFHyUHqHCVpMMbI=", 10 + "owner": "numtide", 11 + "repo": "flake-utils", 12 + "rev": "11707dc2f618dd54ca8739b309ec4fc024de578b", 13 + "type": "github" 14 + }, 15 + "original": { 16 + "owner": "numtide", 17 + "repo": "flake-utils", 18 + "type": "github" 19 + } 20 + }, 21 + "nixpkgs": { 22 + "locked": { 23 + "lastModified": 1757487488, 24 + "narHash": "sha256-zwE/e7CuPJUWKdvvTCB7iunV4E/+G0lKfv4kk/5Izdg=", 25 + "owner": "NixOS", 26 + "repo": "nixpkgs", 27 + "rev": "ab0f3607a6c7486ea22229b92ed2d355f1482ee0", 28 + "type": "github" 29 + }, 30 + "original": { 31 + "owner": "NixOS", 32 + "ref": "nixos-unstable", 33 + "repo": "nixpkgs", 34 + "type": "github" 35 + } 36 + }, 37 + "nixpkgs_2": { 38 + "locked": { 39 + "lastModified": 1744536153, 40 + "narHash": "sha256-awS2zRgF4uTwrOKwwiJcByDzDOdo3Q1rPZbiHQg/N38=", 41 + "owner": "NixOS", 42 + "repo": "nixpkgs", 43 + "rev": "18dd725c29603f582cf1900e0d25f9f1063dbf11", 44 + "type": "github" 45 + }, 46 + "original": { 47 + "owner": "NixOS", 48 + "ref": "nixpkgs-unstable", 49 + "repo": "nixpkgs", 50 + "type": "github" 51 + } 52 + }, 53 + "root": { 54 + "inputs": { 55 + "flake-utils": "flake-utils", 56 + "nixpkgs": "nixpkgs", 57 + "rust-overlay": "rust-overlay" 58 + } 59 + }, 60 + "rust-overlay": { 61 + "inputs": { 62 + "nixpkgs": "nixpkgs_2" 63 + }, 64 + "locked": { 65 + "lastModified": 1757644300, 66 + "narHash": "sha256-bVIDYz31bCdZId441sqgkImnA7aYr2UQzQlyl+O2DWc=", 67 + "owner": "oxalica", 68 + "repo": "rust-overlay", 69 + "rev": "591c5ae84f066bdfc9797b217df392d58eafd088", 70 + "type": "github" 71 + }, 72 + "original": { 73 + "owner": "oxalica", 74 + "repo": "rust-overlay", 75 + "type": "github" 76 + } 77 + }, 78 + "systems": { 79 + "locked": { 80 + "lastModified": 1681028828, 81 + "narHash": "sha256-Vy1rq5AaRuLzOxct8nz4T6wlgyUR7zLU309k9mBC768=", 82 + "owner": "nix-systems", 83 + "repo": "default", 84 + "rev": "da67096a3b9bf56a91d16901293e51ba5b49a27e", 85 + "type": "github" 86 + }, 87 + "original": { 88 + "owner": "nix-systems", 89 + "repo": "default", 90 + "type": "github" 91 + } 92 + } 93 + }, 94 + "root": "root", 95 + "version": 7 96 + }
+92
flake.nix
··· 1 + { 2 + description = "pdsfs - mount an atproto PDS repository as a FUSE filesystem"; 3 + 4 + inputs = { 5 + nixpkgs.url = "github:NixOS/nixpkgs/nixos-unstable"; 6 + rust-overlay.url = "github:oxalica/rust-overlay"; 7 + flake-utils.url = "github:numtide/flake-utils"; 8 + }; 9 + 10 + outputs = { self, nixpkgs, rust-overlay, flake-utils }: 11 + flake-utils.lib.eachDefaultSystem (system: 12 + let 13 + overlays = [ (import rust-overlay) ]; 14 + pkgs = import nixpkgs { 15 + inherit system overlays; 16 + }; 17 + 18 + rustToolchain = 19 + if builtins.pathExists ./rust-toolchain.toml 20 + then pkgs.rust-bin.fromRustupToolchainFile ./rust-toolchain.toml 21 + else pkgs.rust-bin.stable.latest.default.override { 22 + extensions = [ "rust-src" "rust-analyzer" ]; 23 + }; 24 + 25 + nativeBuildInputs = with pkgs; [ 26 + rustToolchain 27 + pkg-config 28 + ]; 29 + 30 + buildInputs = with pkgs; [ 31 + fuse3 32 + openssl 33 + ] ++ lib.optionals stdenv.isDarwin [ 34 + darwin.apple_sdk.frameworks.Security 35 + darwin.apple_sdk.frameworks.CoreServices 36 + ]; 37 + 38 + pdsfs = pkgs.rustPlatform.buildRustPackage { 39 + pname = "pdsfs"; 40 + version = "0.1.0"; 41 + 42 + src = ./.; 43 + 44 + cargoLock = { 45 + lockFile = ./Cargo.lock; 46 + }; 47 + 48 + inherit nativeBuildInputs buildInputs; 49 + 50 + # Skip tests that require network access or FUSE capabilities 51 + doCheck = false; 52 + 53 + meta = with pkgs.lib; { 54 + description = "Mount an atproto PDS repository as a FUSE filesystem"; 55 + homepage = "https://github.com/tangled/pdsfs"; # Update with actual repository URL 56 + license = licenses.mit; # Update with actual license 57 + maintainers = [ ]; # Add maintainers if desired 58 + platforms = platforms.linux ++ platforms.darwin; 59 + }; 60 + }; 61 + in 62 + { 63 + packages = { 64 + default = pdsfs; 65 + pdsfs = pdsfs; 66 + }; 67 + 68 + devShells.default = pkgs.mkShell { 69 + inherit buildInputs nativeBuildInputs; 70 + 71 + shellHook = '' 72 + echo "pdsfs development environment" 73 + echo "Run 'cargo build' to build the project" 74 + echo "Run 'cargo run -- <handle>' to mount a PDS repository" 75 + echo "" 76 + echo "FUSE development notes:" 77 + echo "- Ensure you have FUSE permissions (add user to 'fuse' group if needed)" 78 + echo "- Use 'fusermount -u <mountpoint>' to unmount if needed" 79 + ''; 80 + }; 81 + 82 + # Allow building with different Rust versions 83 + devShells.rust-nightly = pkgs.mkShell { 84 + buildInputs = buildInputs ++ [ 85 + (pkgs.rust-bin.nightly.latest.default.override { 86 + extensions = [ "rust-src" "rust-analyzer" ]; 87 + }) 88 + pkgs.pkg-config 89 + ]; 90 + }; 91 + }); 92 + }
+318
src/firehose.rs
··· 1 + use anyhow::{anyhow, Result}; 2 + use atrium_api::com::atproto::sync::subscribe_repos::{Commit, NSID}; 3 + use atrium_api::client::AtpServiceClient; 4 + use atrium_api::com; 5 + use atrium_api::types; 6 + use atrium_xrpc_client::isahc::IsahcClient; 7 + use futures::StreamExt; 8 + use ipld_core::ipld::Ipld; 9 + use std::io::Cursor; 10 + use std::sync::{Arc, Mutex}; 11 + use tokio_tungstenite::connect_async; 12 + use tokio_tungstenite::tungstenite::Message; 13 + 14 + use crate::fs::{PdsFsCollection, PdsFsEntry, PdsFsRecord}; 15 + use indexmap::{IndexMap, IndexSet}; 16 + 17 + /// Frame header types for WebSocket messages 18 + #[derive(Debug, Clone, PartialEq, Eq)] 19 + enum FrameHeader { 20 + Message(Option<String>), 21 + Error, 22 + } 23 + 24 + impl TryFrom<Ipld> for FrameHeader { 25 + type Error = anyhow::Error; 26 + 27 + fn try_from(value: Ipld) -> Result<Self> { 28 + if let Ipld::Map(map) = value { 29 + if let Some(Ipld::Integer(i)) = map.get("op") { 30 + match i { 31 + 1 => { 32 + let t = if let Some(Ipld::String(s)) = map.get("t") { 33 + Some(s.clone()) 34 + } else { 35 + None 36 + }; 37 + return Ok(FrameHeader::Message(t)); 38 + } 39 + -1 => return Ok(FrameHeader::Error), 40 + _ => {} 41 + } 42 + } 43 + } 44 + Err(anyhow!("invalid frame type")) 45 + } 46 + } 47 + 48 + /// Frame types for parsed WebSocket messages 49 + #[derive(Debug, Clone, PartialEq, Eq)] 50 + pub enum Frame { 51 + Message(Option<String>, MessageFrame), 52 + Error(ErrorFrame), 53 + } 54 + 55 + #[derive(Debug, Clone, PartialEq, Eq)] 56 + pub struct MessageFrame { 57 + pub body: Vec<u8>, 58 + } 59 + 60 + #[derive(Debug, Clone, PartialEq, Eq)] 61 + pub struct ErrorFrame {} 62 + 63 + impl TryFrom<&[u8]> for Frame { 64 + type Error = anyhow::Error; 65 + 66 + fn try_from(value: &[u8]) -> Result<Self> { 67 + let mut cursor = Cursor::new(value); 68 + let (left, right) = match serde_ipld_dagcbor::from_reader::<Ipld, _>(&mut cursor) { 69 + Err(serde_ipld_dagcbor::DecodeError::TrailingData) => { 70 + value.split_at(cursor.position() as usize) 71 + } 72 + _ => { 73 + return Err(anyhow!("invalid frame type")); 74 + } 75 + }; 76 + let header = FrameHeader::try_from(serde_ipld_dagcbor::from_slice::<Ipld>(left)?)?; 77 + if let FrameHeader::Message(t) = &header { 78 + Ok(Frame::Message(t.clone(), MessageFrame { body: right.to_vec() })) 79 + } else { 80 + Ok(Frame::Error(ErrorFrame {})) 81 + } 82 + } 83 + } 84 + 85 + /// Subscribe to a repo's firehose and update inodes on changes 86 + pub async fn subscribe_to_repo<R>( 87 + did: String, 88 + pds: String, 89 + inodes: Arc<Mutex<IndexSet<PdsFsEntry>>>, 90 + sizes: Arc<Mutex<IndexMap<usize, u64>>>, 91 + content_cache: Arc<Mutex<IndexMap<String, String>>>, 92 + notifier: fuser::Notifier, 93 + ) -> Result<()> 94 + where 95 + R: atrium_repo::blockstore::AsyncBlockStoreRead, 96 + { 97 + // Strip https:// or http:// prefix from PDS URL if present 98 + let pds_host = pds.trim_start_matches("https://").trim_start_matches("http://"); 99 + let url = format!("wss://{}/xrpc/{}", pds_host, NSID); 100 + println!("Connecting to firehose: {}", url); 101 + 102 + let (mut stream, _) = connect_async(url).await?; 103 + println!("Connected to firehose for {}", did); 104 + 105 + loop { 106 + match stream.next().await { 107 + Some(Ok(Message::Binary(data))) => { 108 + if let Ok(Frame::Message(Some(t), msg)) = Frame::try_from(data.as_slice()) { 109 + if t.as_str() == "#commit" { 110 + if let Ok(commit) = serde_ipld_dagcbor::from_reader::<Commit, _>(msg.body.as_slice()) { 111 + // Only process commits for our DID 112 + if commit.repo.as_str() == did { 113 + if let Err(e) = handle_commit(&commit, &inodes, &sizes, &content_cache, &did, &pds, &notifier).await { 114 + eprintln!("Error handling commit: {:?}", e); 115 + } 116 + } 117 + } 118 + } 119 + } 120 + } 121 + Some(Ok(_)) => {} // Ignore other message types 122 + Some(Err(e)) => { 123 + eprintln!("WebSocket error: {}", e); 124 + break; 125 + } 126 + None => { 127 + eprintln!("WebSocket closed"); 128 + break; 129 + } 130 + } 131 + } 132 + 133 + Ok(()) 134 + } 135 + 136 + /// Handle a commit by updating the inode tree and notifying Finder 137 + async fn handle_commit( 138 + commit: &Commit, 139 + inodes: &Arc<Mutex<IndexSet<PdsFsEntry>>>, 140 + sizes: &Arc<Mutex<IndexMap<usize, u64>>>, 141 + content_cache: &Arc<Mutex<IndexMap<String, String>>>, 142 + did: &str, 143 + pds: &str, 144 + notifier: &fuser::Notifier, 145 + ) -> Result<()> { 146 + // Find the DID inode 147 + let did_entry = PdsFsEntry::Did(did.to_string()); 148 + let did_inode = { 149 + let inodes_lock = inodes.lock().unwrap(); 150 + inodes_lock.get_index_of(&did_entry) 151 + }; 152 + 153 + let Some(did_inode) = did_inode else { 154 + return Err(anyhow!("DID not found in inodes")); 155 + }; 156 + 157 + for op in &commit.ops { 158 + let Some((collection, rkey)) = op.path.split_once('/') else { 159 + continue; 160 + }; 161 + 162 + match op.action.as_str() { 163 + "create" => { 164 + // Fetch the record from PDS 165 + let record_key = format!("{}/{}", collection, rkey); 166 + let cache_key = format!("{}/{}", did, record_key); 167 + 168 + // Fetch record content from PDS 169 + match fetch_record(pds, did, collection, rkey).await { 170 + Ok(content) => { 171 + let content_len = content.len() as u64; 172 + 173 + // Add the record to inodes 174 + let (collection_inode, record_inode) = { 175 + let mut inodes_lock = inodes.lock().unwrap(); 176 + 177 + // Ensure collection exists 178 + let collection_entry = PdsFsEntry::Collection(PdsFsCollection { 179 + parent: did_inode, 180 + nsid: collection.to_string(), 181 + }); 182 + let (collection_inode, _) = inodes_lock.insert_full(collection_entry); 183 + 184 + // Add the record 185 + let record_entry = PdsFsEntry::Record(PdsFsRecord { 186 + parent: collection_inode, 187 + rkey: rkey.to_string(), 188 + }); 189 + let (record_inode, _) = inodes_lock.insert_full(record_entry); 190 + (collection_inode, record_inode) 191 + }; 192 + 193 + // Cache the content and size 194 + content_cache.lock().unwrap().insert(cache_key, content); 195 + sizes.lock().unwrap().insert(record_inode, content_len); 196 + 197 + // Notify Finder about the new file (release lock first) 198 + let filename = format!("{}.json", rkey); 199 + if let Err(e) = notifier.inval_entry(collection_inode as u64, filename.as_ref()) { 200 + eprintln!("Failed to invalidate entry for {}: {}", filename, e); 201 + } 202 + 203 + println!("Created: {}/{}", collection, rkey); 204 + } 205 + Err(e) => { 206 + eprintln!("Failed to fetch record {}/{}: {}", collection, rkey, e); 207 + } 208 + } 209 + } 210 + "delete" => { 211 + // Get inodes before removing 212 + let (collection_inode_opt, child_inode_opt) = { 213 + let mut inodes_lock = inodes.lock().unwrap(); 214 + 215 + // Find the collection 216 + let collection_entry = PdsFsEntry::Collection(PdsFsCollection { 217 + parent: did_inode, 218 + nsid: collection.to_string(), 219 + }); 220 + let collection_inode = inodes_lock.get_index_of(&collection_entry); 221 + 222 + // Find and remove the record 223 + let child_inode = if let Some(coll_ino) = collection_inode { 224 + let record_entry = PdsFsEntry::Record(PdsFsRecord { 225 + parent: coll_ino, 226 + rkey: rkey.to_string(), 227 + }); 228 + let child_ino = inodes_lock.get_index_of(&record_entry); 229 + inodes_lock.shift_remove(&record_entry); 230 + child_ino 231 + } else { 232 + None 233 + }; 234 + 235 + (collection_inode, child_inode) 236 + }; 237 + 238 + // Notify Finder about the deletion (release lock first) 239 + if let (Some(coll_ino), Some(child_ino)) = (collection_inode_opt, child_inode_opt) { 240 + // Remove from caches 241 + sizes.lock().unwrap().shift_remove(&child_ino); 242 + let cache_key = format!("{}/{}/{}", did, collection, rkey); 243 + content_cache.lock().unwrap().shift_remove(&cache_key); 244 + 245 + let filename = format!("{}.json", rkey); 246 + if let Err(e) = notifier.delete(coll_ino as u64, child_ino as u64, filename.as_ref()) { 247 + eprintln!("Failed to notify deletion for {}: {}", filename, e); 248 + } 249 + } 250 + 251 + println!("Deleted: {}/{}", collection, rkey); 252 + } 253 + "update" => { 254 + // For updates, invalidate the inode so content is re-fetched 255 + let record_inode_opt = { 256 + let inodes_lock = inodes.lock().unwrap(); 257 + let collection_entry = PdsFsEntry::Collection(PdsFsCollection { 258 + parent: did_inode, 259 + nsid: collection.to_string(), 260 + }); 261 + 262 + if let Some(collection_inode) = inodes_lock.get_index_of(&collection_entry) { 263 + let record_entry = PdsFsEntry::Record(PdsFsRecord { 264 + parent: collection_inode, 265 + rkey: rkey.to_string(), 266 + }); 267 + inodes_lock.get_index_of(&record_entry) 268 + } else { 269 + None 270 + } 271 + }; 272 + 273 + // Notify Finder to invalidate the inode (release lock first) 274 + if let Some(record_ino) = record_inode_opt { 275 + // Clear caches so content is recalculated 276 + sizes.lock().unwrap().shift_remove(&record_ino); 277 + let cache_key = format!("{}/{}/{}", did, collection, rkey); 278 + content_cache.lock().unwrap().shift_remove(&cache_key); 279 + 280 + // Invalidate the entire inode (metadata and all data) 281 + if let Err(e) = notifier.inval_inode(record_ino as u64, 0, 0) { 282 + eprintln!("Failed to invalidate inode for {}/{}: {}", collection, rkey, e); 283 + } 284 + } 285 + 286 + println!("Updated: {}/{}", collection, rkey); 287 + } 288 + _ => {} 289 + } 290 + } 291 + 292 + Ok(()) 293 + } 294 + 295 + /// Fetch a record from the PDS 296 + async fn fetch_record(pds: &str, did: &str, collection: &str, rkey: &str) -> Result<String> { 297 + let client = AtpServiceClient::new(IsahcClient::new(pds)); 298 + let did = types::string::Did::new(did.to_string()).map_err(|e| anyhow!(e))?; 299 + let collection_nsid = types::string::Nsid::new(collection.to_string()).map_err(|e| anyhow!(e))?; 300 + let record_key = types::string::RecordKey::new(rkey.to_string()).map_err(|e| anyhow!(e))?; 301 + 302 + let response = client 303 + .service 304 + .com 305 + .atproto 306 + .repo 307 + .get_record(com::atproto::repo::get_record::Parameters::from( 308 + com::atproto::repo::get_record::ParametersData { 309 + cid: None, 310 + collection: collection_nsid, 311 + repo: types::string::AtIdentifier::Did(did), 312 + rkey: record_key, 313 + } 314 + )) 315 + .await?; 316 + 317 + Ok(serde_json::to_string_pretty(&response.value)?) 318 + }
+154 -64
src/fs.rs
··· 1 - use std::time; 1 + use std::sync::{Arc, Mutex}; 2 + use std::time::{self, SystemTime, UNIX_EPOCH, Duration}; 2 3 3 4 use atrium_repo::{Repository, blockstore::AsyncBlockStoreRead}; 4 5 use futures::StreamExt; ··· 6 7 7 8 type Inode = usize; 8 9 10 + /// Decode a TID (timestamp identifier) to get the timestamp in microseconds since Unix epoch 11 + fn tid_to_timestamp(tid: &str) -> Option<SystemTime> { 12 + const S32_CHAR: &[u8] = b"234567abcdefghijklmnopqrstuvwxyz"; 13 + 14 + if tid.len() != 13 { 15 + return None; 16 + } 17 + 18 + let mut value: u64 = 0; 19 + for ch in tid.chars() { 20 + let pos = S32_CHAR.iter().position(|&c| c as char == ch)?; 21 + // Big-endian: first character is most significant 22 + value = (value << 5) | (pos as u64); 23 + } 24 + 25 + // Extract timestamp from upper bits (shifted by 10) 26 + let micros = value >> 10; 27 + 28 + UNIX_EPOCH.checked_add(Duration::from_micros(micros)) 29 + } 30 + 9 31 pub struct PdsFs<R> { 10 - repos: IndexMap<String, Repository<R>>, 11 - inodes: IndexSet<PdsFsEntry>, 32 + repos: Arc<Mutex<IndexMap<String, Repository<R>>>>, 33 + inodes: Arc<Mutex<IndexSet<PdsFsEntry>>>, 34 + sizes: Arc<Mutex<IndexMap<Inode, u64>>>, 35 + content_cache: Arc<Mutex<IndexMap<String, String>>>, 36 + rt: tokio::runtime::Runtime, 12 37 } 13 38 14 39 #[derive(Debug, Clone, PartialEq, Eq, Hash)] ··· 46 71 47 72 #[derive(Debug, Clone, PartialEq, Eq, Hash)] 48 73 pub struct PdsFsCollection { 49 - parent: Inode, 50 - nsid: String, 74 + pub parent: Inode, 75 + pub nsid: String, 51 76 } 52 77 53 78 #[derive(Debug, Clone, PartialEq, Eq, Hash)] 54 79 pub struct PdsFsRecord { 55 - parent: Inode, 56 - rkey: String, 80 + pub parent: Inode, 81 + pub rkey: String, 57 82 } 58 83 59 84 // impl PdsFsRecord { ··· 89 114 { 90 115 pub fn new() -> Self { 91 116 PdsFs { 92 - repos: Default::default(), 93 - inodes: IndexSet::from([PdsFsEntry::Zero, PdsFsEntry::Root]), 117 + repos: Arc::new(Mutex::new(Default::default())), 118 + inodes: Arc::new(Mutex::new(IndexSet::from([PdsFsEntry::Zero, PdsFsEntry::Root]))), 119 + sizes: Arc::new(Mutex::new(Default::default())), 120 + content_cache: Arc::new(Mutex::new(Default::default())), 121 + rt: tokio::runtime::Runtime::new().unwrap(), 94 122 } 95 123 } 96 124 125 + pub fn get_shared_state(&self) -> (Arc<Mutex<IndexMap<String, Repository<R>>>>, Arc<Mutex<IndexSet<PdsFsEntry>>>, Arc<Mutex<IndexMap<Inode, u64>>>, Arc<Mutex<IndexMap<String, String>>>) { 126 + (Arc::clone(&self.repos), Arc::clone(&self.inodes), Arc::clone(&self.sizes), Arc::clone(&self.content_cache)) 127 + } 128 + 97 129 pub async fn add(&mut self, did: String, mut repo: Repository<R>) { 98 130 let mut mst = repo.tree(); 99 131 100 - let (did_inode, _) = self.inodes.insert_full(PdsFsEntry::Did(did.clone())); 132 + let did_inode = { 133 + let mut inodes = self.inodes.lock().unwrap(); 134 + let (did_inode, _) = inodes.insert_full(PdsFsEntry::Did(did.clone())); 135 + did_inode 136 + }; 101 137 102 138 let mut keys = Box::pin(mst.keys()); 103 139 while let Some(Ok(key)) = keys.next().await { 104 140 if let Some((collection_name, rkey)) = key.split_once("/") { 105 - let (collection_inode, _) = 106 - self.inodes 107 - .insert_full(PdsFsEntry::Collection(PdsFsCollection { 108 - parent: did_inode, 109 - nsid: collection_name.to_owned(), 110 - })); 141 + let mut inodes = self.inodes.lock().unwrap(); 142 + let (collection_inode, _) = inodes.insert_full(PdsFsEntry::Collection(PdsFsCollection { 143 + parent: did_inode, 144 + nsid: collection_name.to_owned(), 145 + })); 111 146 112 - self.inodes.insert(PdsFsEntry::Record(PdsFsRecord { 147 + inodes.insert(PdsFsEntry::Record(PdsFsRecord { 113 148 parent: collection_inode, 114 149 rkey: rkey.to_owned(), 115 150 })); ··· 119 154 drop(keys); 120 155 drop(mst); 121 156 122 - self.repos.insert(did, repo); 157 + self.repos.lock().unwrap().insert(did, repo); 123 158 } 124 159 125 160 fn attr(&mut self, ino: u64) -> fuser::FileAttr { 126 - match self.inodes.get_index(ino as usize) { 161 + let inodes = self.inodes.lock().unwrap(); 162 + match inodes.get_index(ino as usize) { 127 163 Some(PdsFsEntry::Root) => ROOTDIR_ATTR, 128 164 Some(PdsFsEntry::Collection(_)) => fuser::FileAttr { 129 165 ino, ··· 160 196 blksize: BLKSIZE, 161 197 }, 162 198 Some(PdsFsEntry::Record(r)) => { 163 - let col = self.inodes[r.parent].unwrap_collection(); 164 - let did = self.inodes[col.parent].unwrap_did(); 165 - let repo = &mut self.repos[did]; 166 - let key = format!("{}/{}", col.nsid, r.rkey); 167 - let rt = tokio::runtime::Runtime::new().unwrap(); 168 - let size = rt 169 - .block_on(repo.get_raw::<ipld_core::ipld::Ipld>(&key)) 170 - .ok() 171 - .flatten() 172 - .map_or(0, |v| serde_json::to_string(&v).unwrap().len()) 173 - as u64; 199 + let col = inodes[r.parent].unwrap_collection(); 200 + let did = inodes[col.parent].unwrap_did().clone(); 201 + let rkey = r.rkey.clone(); 202 + let collection_nsid = col.nsid.clone(); 203 + drop(inodes); 204 + 205 + // Check cache first 206 + let size = { 207 + let sizes = self.sizes.lock().unwrap(); 208 + if let Some(&cached_size) = sizes.get(&(ino as usize)) { 209 + cached_size 210 + } else { 211 + drop(sizes); 212 + // Not in cache, try to fetch from repo 213 + let mut repos = self.repos.lock().unwrap(); 214 + let repo = &mut repos[&did]; 215 + let key = format!("{}/{}", collection_nsid, rkey); 216 + let size = self 217 + .rt 218 + .block_on(repo.get_raw::<ipld_core::ipld::Ipld>(&key)) 219 + .ok() 220 + .flatten() 221 + .map_or(500, |v| serde_json::to_string_pretty(&v).unwrap().len()) 222 + as u64; 223 + // Cache it for next time 224 + self.sizes.lock().unwrap().insert(ino as usize, size); 225 + size 226 + } 227 + }; 174 228 let blocks = ((size as u32 + BLKSIZE - 1) / BLKSIZE) as u64; 229 + 230 + // Decode TID to get creation timestamp 231 + let timestamp = tid_to_timestamp(&rkey).unwrap_or(time::UNIX_EPOCH); 232 + 175 233 fuser::FileAttr { 176 234 ino, 177 235 size, 178 236 blocks, 179 - atime: time::UNIX_EPOCH, 180 - mtime: time::UNIX_EPOCH, 181 - ctime: time::UNIX_EPOCH, 182 - crtime: time::UNIX_EPOCH, 237 + atime: timestamp, 238 + mtime: timestamp, 239 + ctime: timestamp, 240 + crtime: timestamp, 183 241 kind: fuser::FileType::RegularFile, 184 242 perm: 0o644, 185 243 nlink: 1, ··· 187 245 gid: 20, 188 246 rdev: 0, 189 247 flags: 0, 190 - blksize: 512, 248 + blksize: BLKSIZE, 191 249 } 192 250 } 193 251 _ => panic!("zero"), ··· 206 264 _fh: Option<u64>, 207 265 reply: fuser::ReplyAttr, 208 266 ) { 209 - if (ino as usize) < self.inodes.len() { 267 + let len = self.inodes.lock().unwrap().len(); 268 + if (ino as usize) < len { 210 269 reply.attr(&TTL, &self.attr(ino as u64)) 211 270 } else { 212 271 reply.error(libc::ENOENT) ··· 221 280 offset: i64, 222 281 mut reply: fuser::ReplyDirectory, 223 282 ) { 224 - match self.inodes.get_index(ino as usize) { 283 + let inodes = self.inodes.lock().unwrap(); 284 + match inodes.get_index(ino as usize) { 225 285 Some(PdsFsEntry::Root) => { 226 286 let entries: Vec<_> = vec![(ino, ".".to_string()), (ino, "..".to_string())] 227 287 .into_iter() 228 - .chain(self.inodes.iter().enumerate().filter_map(|(i, e)| { 288 + .chain(inodes.iter().enumerate().filter_map(|(i, e)| { 229 289 if let PdsFsEntry::Did(did) = e { 230 290 Some((i as u64, did.clone())) 231 291 } else { ··· 233 293 } 234 294 })) 235 295 .collect(); 296 + drop(inodes); 236 297 237 298 for (index, (inode_num, name)) in 238 299 entries.into_iter().enumerate().skip(offset as usize) ··· 249 310 reply.ok() 250 311 } 251 312 Some(PdsFsEntry::Did(_)) => { 252 - let entries = vec![(ino, ".".to_string()), (1, "..".to_string())] 313 + let entries: Vec<_> = vec![(ino, ".".to_string()), (1, "..".to_string())] 253 314 .into_iter() 254 - .chain(self.inodes.iter().enumerate().filter_map(|(i, e)| { 315 + .chain(inodes.iter().enumerate().filter_map(|(i, e)| { 255 316 if let PdsFsEntry::Collection(col) = e { 256 317 if col.parent == ino as usize { 257 318 Some((i as u64, col.nsid.clone())) ··· 262 323 None 263 324 } 264 325 })) 265 - .into_iter() 266 - .enumerate() 267 - .skip(offset as usize); 326 + .collect(); 327 + drop(inodes); 268 328 269 - for (index, (inode_num, name)) in entries { 329 + for (index, (inode_num, name)) in entries.into_iter().enumerate().skip(offset as usize) { 270 330 let full = reply.add( 271 331 inode_num, 272 332 (index + 1) as i64, ··· 285 345 reply.ok(); 286 346 } 287 347 Some(PdsFsEntry::Collection(c)) => { 288 - let entries = [(ino, ".".to_string()), (c.parent as u64, "..".to_string())] 348 + let parent_ino = c.parent; 349 + let entries: Vec<_> = [(ino, ".".to_string()), (parent_ino as u64, "..".to_string())] 289 350 .into_iter() 290 - .chain(self.inodes.iter().enumerate().filter_map(|(i, e)| { 351 + .chain(inodes.iter().enumerate().filter_map(|(i, e)| { 291 352 if let PdsFsEntry::Record(record) = e { 292 353 if record.parent == ino as usize { 293 - Some((i as u64, record.rkey.clone())) 354 + Some((i as u64, format!("{}.json", record.rkey))) 294 355 } else { 295 356 None 296 357 } ··· 298 359 None 299 360 } 300 361 })) 301 - .into_iter() 302 - .enumerate() 303 - .skip(offset as usize); 362 + .collect(); 363 + drop(inodes); 304 364 305 - for (index, (inode_num, name)) in entries { 365 + for (index, (inode_num, name)) in entries.into_iter().enumerate().skip(offset as usize) { 306 366 let full = reply.add( 307 367 inode_num, 308 368 (index + 1) as i64, ··· 320 380 321 381 reply.ok() 322 382 } 323 - _ => reply.error(libc::ENOENT), 383 + _ => { 384 + drop(inodes); 385 + reply.error(libc::ENOENT) 386 + } 324 387 } 325 388 } 326 389 ··· 331 394 name: &std::ffi::OsStr, 332 395 reply: fuser::ReplyEntry, 333 396 ) { 334 - match self.inodes.get_index(parent as usize) { 397 + let inodes = self.inodes.lock().unwrap(); 398 + match inodes.get_index(parent as usize) { 335 399 Some(PdsFsEntry::Root) => { 336 400 let did = PdsFsEntry::Did(name.to_string_lossy().to_string()); 337 - if let Some(ino) = self.inodes.get_index_of(&did) { 401 + if let Some(ino) = inodes.get_index_of(&did) { 402 + drop(inodes); 338 403 reply.entry(&TTL, &self.attr(ino as u64), 0); 339 404 } else { 405 + drop(inodes); 340 406 reply.error(libc::ENOENT) 341 407 } 342 408 } ··· 345 411 parent: parent as usize, 346 412 nsid: name.to_string_lossy().to_string(), 347 413 }); 348 - if let Some(ino) = self.inodes.get_index_of(&col) { 414 + if let Some(ino) = inodes.get_index_of(&col) { 415 + drop(inodes); 349 416 reply.entry(&TTL, &self.attr(ino as u64), 0); 350 417 } else { 418 + drop(inodes); 351 419 reply.error(libc::ENOENT) 352 420 } 353 421 } 354 422 Some(PdsFsEntry::Collection(_)) => { 423 + let name_str = name.to_string_lossy(); 424 + let rkey = name_str.strip_suffix(".json").unwrap_or(&name_str).to_string(); 355 425 let record = PdsFsEntry::Record(PdsFsRecord { 356 426 parent: parent as usize, 357 - rkey: name.to_string_lossy().to_string(), 427 + rkey, 358 428 }); 359 - if let Some(ino) = self.inodes.get_index_of(&record) { 429 + if let Some(ino) = inodes.get_index_of(&record) { 430 + drop(inodes); 360 431 reply.entry(&TTL, &self.attr(ino as u64), 0); 361 432 } else { 433 + drop(inodes); 362 434 reply.error(libc::ENOENT) 363 435 } 364 436 } 365 - _ => reply.error(libc::ENOENT), 437 + _ => { 438 + drop(inodes); 439 + reply.error(libc::ENOENT) 440 + } 366 441 } 367 442 } 368 443 ··· 377 452 _lock: Option<u64>, 378 453 reply: fuser::ReplyData, 379 454 ) { 380 - let rt = tokio::runtime::Runtime::new().unwrap(); 381 - if let Some(PdsFsEntry::Record(r)) = self.inodes.get_index(ino as usize) { 382 - let col = self.inodes[r.parent].unwrap_collection(); 383 - let did = self.inodes[col.parent].unwrap_did(); 384 - let repo = &mut self.repos[did]; 455 + let inodes = self.inodes.lock().unwrap(); 456 + if let Some(PdsFsEntry::Record(r)) = inodes.get_index(ino as usize) { 457 + let col = inodes[r.parent].unwrap_collection(); 458 + let did = inodes[col.parent].unwrap_did().clone(); 385 459 let key = format!("{}/{}", col.nsid, r.rkey); 460 + let cache_key = format!("{}/{}", did, key); 461 + drop(inodes); 386 462 387 - if let Ok(Some(val)) = rt.block_on(repo.get_raw::<ipld_core::ipld::Ipld>(&key)) { 463 + // Check content cache first (for new records from firehose) 464 + { 465 + let cache = self.content_cache.lock().unwrap(); 466 + if let Some(content) = cache.get(&cache_key) { 467 + reply.data(&content.as_bytes()[offset as usize..]); 468 + return; 469 + } 470 + } 471 + 472 + // Fall back to repo 473 + let mut repos = self.repos.lock().unwrap(); 474 + let repo = &mut repos[&did]; 475 + if let Ok(Some(val)) = self.rt.block_on(repo.get_raw::<ipld_core::ipld::Ipld>(&key)) { 388 476 reply.data(&serde_json::to_string(&val).unwrap().as_bytes()[offset as usize..]); 389 477 return; 390 478 } 479 + } else { 480 + drop(inodes); 391 481 } 392 482 reply.error(libc::ENOENT); 393 483 }
+52 -32
src/main.rs
··· 1 - #![feature(let_chains)] 2 1 mod client; 3 2 mod error; 3 + mod firehose; 4 4 mod fs; 5 5 mod resolver; 6 6 ··· 13 13 use futures::{StreamExt, stream}; 14 14 use indicatif::{MultiProgress, ProgressBar, ProgressStyle}; 15 15 use std::{ 16 - collections::HashMap, 17 16 io::{Cursor, Write}, 18 17 path::PathBuf, 19 18 sync::Arc, 20 19 }; 21 - use xdg::BaseDirectories; 22 20 23 21 fn main() { 24 22 let rt = tokio::runtime::Runtime::new().unwrap(); ··· 60 58 let id = r.resolve(&h).await?; 61 59 let bytes = cached_download(&id, &b).await?; 62 60 let repo = build_repo(bytes).await?; 63 - Ok::<_, error::Error>((id.did, repo)) 61 + Ok::<_, error::Error>((id.did.clone(), id.pds.clone(), repo)) 64 62 } 65 63 }) 66 64 .collect::<Vec<_>>(), ··· 69 67 for e in errors { 70 68 eprintln!("{:?}", e.as_ref().unwrap_err()); 71 69 } 72 - let repos = success 70 + let repos_with_pds: Vec<_> = success 73 71 .into_iter() 74 72 .map(|s| s.unwrap()) 75 - .collect::<HashMap<_, _>>(); 73 + .collect(); 76 74 77 75 // construct the fs 78 76 let mut fs = fs::PdsFs::new(); 79 - for (did, repo) in repos { 77 + 78 + // Extract (did, pds) pairs for WebSocket tasks before consuming repos 79 + let did_pds_pairs: Vec<_> = repos_with_pds.iter() 80 + .map(|(did, pds, _)| (did.clone(), pds.clone())) 81 + .collect(); 82 + 83 + // Consume repos_with_pds to add repos to filesystem 84 + for (did, _, repo) in repos_with_pds { 80 85 rt.block_on(fs.add(did, repo)) 81 86 } 82 87 88 + // get shared state for WebSocket tasks 89 + let (_repos_arc, inodes_arc, sizes_arc, content_cache_arc) = fs.get_shared_state(); 90 + 83 91 // mount 84 - let options = vec![MountOption::RO, MountOption::FSName("pdsfs".to_string())]; 85 - let join_handle = fuser::spawn_mount2(fs, &mountpoint, &options).unwrap(); 92 + let options = vec![ 93 + MountOption::RO, 94 + MountOption::FSName("pdsfs".to_string()), 95 + MountOption::AllowOther, 96 + MountOption::CUSTOM("local".to_string()), 97 + MountOption::CUSTOM("volname=pdsfs".to_string()), 98 + ]; 99 + 100 + // Create session and get notifier for Finder refresh 101 + let session = fuser::Session::new(fs, &mountpoint, &options).unwrap(); 102 + let notifier = session.notifier(); 103 + let _bg = session.spawn().unwrap(); 104 + 105 + // spawn WebSocket subscription tasks for each DID using the runtime handle 106 + let rt_handle = rt.handle().clone(); 107 + for (did, pds) in did_pds_pairs { 108 + let inodes_clone = Arc::clone(&inodes_arc); 109 + let sizes_clone = Arc::clone(&sizes_arc); 110 + let content_cache_clone = Arc::clone(&content_cache_arc); 111 + let notifier_clone = notifier.clone(); 112 + 113 + rt_handle.spawn(async move { 114 + if let Err(e) = firehose::subscribe_to_repo::<atrium_repo::blockstore::CarStore<std::io::Cursor<Vec<u8>>>>( 115 + did, 116 + pds, 117 + inodes_clone, 118 + sizes_clone, 119 + content_cache_clone, 120 + notifier_clone, 121 + ).await { 122 + eprintln!("WebSocket error: {:?}", e); 123 + } 124 + }); 125 + } 86 126 87 127 println!("mounted at {mountpoint:?}"); 88 128 print!("hit enter to unmount and exit..."); ··· 91 131 // Wait for user input 92 132 let mut input = String::new(); 93 133 std::io::stdin().read_line(&mut input).unwrap(); 94 - 95 - join_handle.join(); 96 - std::fs::remove_dir(&mountpoint).unwrap(); 97 134 98 135 println!("unmounted {mountpoint:?}"); 99 136 } ··· 112 149 pb.enable_steady_tick(std::time::Duration::from_millis(100)); 113 150 pb = m.add(pb); 114 151 115 - let dirs = BaseDirectories::new(); 116 - 117 - let dir = dirs.get_data_home().expect("$HOME is absent").join("pdsfs"); 118 - tokio::fs::create_dir_all(&dir).await?; 119 - 120 - let file = dir.join(&id.did); 121 - let exists = std::fs::exists(&file)?; 122 - 123 - let bytes = if !exists { 124 - pb.set_message(format!("downloading CAR file for...{}", id.did)); 125 - download_car_file(id, &pb).await? 126 - } else { 127 - pb.set_message(format!("using cached CAR file for...{}", id.did)); 128 - tokio::fs::read(&file).await? 129 - }; 130 - 131 - // write to disk 132 - if !exists { 133 - tokio::fs::write(&file, &bytes).await?; 134 - } 152 + // Always download fresh - no caching for now to ensure up-to-date data 153 + pb.set_message(format!("downloading CAR file for...{}", id.did)); 154 + let bytes = download_car_file(id, &pb).await?; 135 155 136 156 pb.finish(); 137 157 Ok(bytes)