mount an atproto PDS repository as a FUSE filesystem

Compare changes

Choose any two refs to compare.

+22
.tangled/workflows/build.yml
··· 1 + when: 2 + - event: push 3 + branch: main 4 + 5 + dependencies: 6 + nixpkgs: 7 + - gcc 8 + - cargo 9 + - rustc 10 + - rustfmt 11 + - pkg-config 12 + - fuse3 13 + - openssl 14 + 15 + steps: 16 + - name: build 17 + command: | 18 + export PKG_CONFIG_PATH="${PKG_CONFIG_PATH:+$PKG_CONFIG_PATH:}$(nix build nixpkgs#openssl.dev --no-link --print-out-paths)/lib/pkgconfig" 19 + export PKG_CONFIG_PATH="${PKG_CONFIG_PATH}:$(nix build nixpkgs#fuse3.dev --no-link --print-out-paths)/lib/pkgconfig" 20 + cargo build --all --verbose 21 + 22 +
+17
.tangled/workflows/fmt.yml
··· 1 + when: 2 + - event: push 3 + branch: main 4 + 5 + dependencies: 6 + nixpkgs: 7 + - cargo 8 + - rustc 9 + - rustfmt 10 + 11 + steps: 12 + - name: build 13 + command: | 14 + cargo fmt --check --all --verbose 15 + 16 + 17 +
+101 -7
Cargo.lock
··· 98 98 ] 99 99 100 100 [[package]] 101 + name = "anyhow" 102 + version = "1.0.100" 103 + source = "registry+https://github.com/rust-lang/crates.io-index" 104 + checksum = "a23eb6b1614318a8071c9b2521f36b424b2c83db5eb3a0fead4a6c0809af6e61" 105 + 106 + [[package]] 101 107 name = "async-channel" 102 108 version = "1.9.0" 103 109 source = "registry+https://github.com/rust-lang/crates.io-index" ··· 327 333 version = "3.19.0" 328 334 source = "registry+https://github.com/rust-lang/crates.io-index" 329 335 checksum = "46c5e41b57b8bba42a04676d81cb89e9ee8e859a1a66f80a5a72e1cb76b34d43" 336 + 337 + [[package]] 338 + name = "byteorder" 339 + version = "1.5.0" 340 + source = "registry+https://github.com/rust-lang/crates.io-index" 341 + checksum = "1fd0f2584146f6f2ef48085050886acf353beff7305ebd1ae69500e27c67f64b" 330 342 331 343 [[package]] 332 344 name = "bytes" ··· 669 681 checksum = "778e2ac28f6c47af28e4907f13ffd1e1ddbd400980a9abd7c8df189bf578a5ad" 670 682 dependencies = [ 671 683 "libc", 672 - "windows-sys 0.59.0", 684 + "windows-sys 0.60.2", 673 685 ] 674 686 675 687 [[package]] ··· 991 1003 "idna", 992 1004 "ipnet", 993 1005 "once_cell", 994 - "rand", 1006 + "rand 0.9.2", 995 1007 "ring", 996 1008 "thiserror 2.0.12", 997 1009 "tinyvec", ··· 1013 1025 "moka", 1014 1026 "once_cell", 1015 1027 "parking_lot", 1016 - "rand", 1028 + "rand 0.9.2", 1017 1029 "resolv-conf", 1018 1030 "smallvec", 1019 1031 "thiserror 2.0.12", ··· 1757 1769 name = "pdsfs" 1758 1770 version = "0.1.0" 1759 1771 dependencies = [ 1772 + "anyhow", 1760 1773 "atrium-api", 1761 1774 "atrium-common", 1762 1775 "atrium-identity", ··· 1776 1789 "serde_json", 1777 1790 "thiserror 2.0.12", 1778 1791 "tokio", 1792 + "tokio-tungstenite", 1779 1793 "xdg", 1780 1794 ] 1781 1795 ··· 1889 1903 1890 1904 [[package]] 1891 1905 name = "rand" 1906 + version = "0.8.5" 1907 + source = "registry+https://github.com/rust-lang/crates.io-index" 1908 + checksum = "34af8d1a0e25924bc5b7c43c079c942339d8f0a8b57c39049bef581b46327404" 1909 + dependencies = [ 1910 + "libc", 1911 + "rand_chacha 0.3.1", 1912 + "rand_core 0.6.4", 1913 + ] 1914 + 1915 + [[package]] 1916 + name = "rand" 1892 1917 version = "0.9.2" 1893 1918 source = "registry+https://github.com/rust-lang/crates.io-index" 1894 1919 checksum = "6db2770f06117d490610c7488547d543617b21bfa07796d7a12f6f1bd53850d1" 1895 1920 dependencies = [ 1896 - "rand_chacha", 1897 - "rand_core", 1921 + "rand_chacha 0.9.0", 1922 + "rand_core 0.9.3", 1923 + ] 1924 + 1925 + [[package]] 1926 + name = "rand_chacha" 1927 + version = "0.3.1" 1928 + source = "registry+https://github.com/rust-lang/crates.io-index" 1929 + checksum = "e6c10a63a0fa32252be49d21e7709d4d4baf8d231c2dbce1eaa8141b9b127d88" 1930 + dependencies = [ 1931 + "ppv-lite86", 1932 + "rand_core 0.6.4", 1898 1933 ] 1899 1934 1900 1935 [[package]] ··· 1904 1939 checksum = "d3022b5f1df60f26e1ffddd6c66e8aa15de382ae63b3a0c1bfc0e4d3e3f325cb" 1905 1940 dependencies = [ 1906 1941 "ppv-lite86", 1907 - "rand_core", 1942 + "rand_core 0.9.3", 1943 + ] 1944 + 1945 + [[package]] 1946 + name = "rand_core" 1947 + version = "0.6.4" 1948 + source = "registry+https://github.com/rust-lang/crates.io-index" 1949 + checksum = "ec0be4795e2f6a28069bec0b5ff3e2ac9bafc99e6a9a7dc3547996c5c816922c" 1950 + dependencies = [ 1951 + "getrandom 0.2.16", 1908 1952 ] 1909 1953 1910 1954 [[package]] ··· 2057 2101 "errno", 2058 2102 "libc", 2059 2103 "linux-raw-sys", 2060 - "windows-sys 0.59.0", 2104 + "windows-sys 0.60.2", 2061 2105 ] 2062 2106 2063 2107 [[package]] ··· 2234 2278 ] 2235 2279 2236 2280 [[package]] 2281 + name = "sha1" 2282 + version = "0.10.6" 2283 + source = "registry+https://github.com/rust-lang/crates.io-index" 2284 + checksum = "e3bf829a2d51ab4a5ddf1352d8470c140cadc8301b2ae1789db023f01cedd6ba" 2285 + dependencies = [ 2286 + "cfg-if", 2287 + "cpufeatures", 2288 + "digest", 2289 + ] 2290 + 2291 + [[package]] 2237 2292 name = "sha2" 2238 2293 version = "0.10.9" 2239 2294 source = "registry+https://github.com/rust-lang/crates.io-index" ··· 2515 2570 ] 2516 2571 2517 2572 [[package]] 2573 + name = "tokio-tungstenite" 2574 + version = "0.24.0" 2575 + source = "registry+https://github.com/rust-lang/crates.io-index" 2576 + checksum = "edc5f74e248dc973e0dbb7b74c7e0d6fcc301c694ff50049504004ef4d0cdcd9" 2577 + dependencies = [ 2578 + "futures-util", 2579 + "log", 2580 + "native-tls", 2581 + "tokio", 2582 + "tokio-native-tls", 2583 + "tungstenite", 2584 + ] 2585 + 2586 + [[package]] 2518 2587 name = "tokio-util" 2519 2588 version = "0.7.15" 2520 2589 source = "registry+https://github.com/rust-lang/crates.io-index" ··· 2663 2732 checksum = "e421abadd41a4225275504ea4d6566923418b7f05506fbc9c0fe86ba7396114b" 2664 2733 2665 2734 [[package]] 2735 + name = "tungstenite" 2736 + version = "0.24.0" 2737 + source = "registry+https://github.com/rust-lang/crates.io-index" 2738 + checksum = "18e5b8366ee7a95b16d32197d0b2604b43a0be89dc5fac9f8e96ccafbaedda8a" 2739 + dependencies = [ 2740 + "byteorder", 2741 + "bytes", 2742 + "data-encoding", 2743 + "http 1.3.1", 2744 + "httparse", 2745 + "log", 2746 + "native-tls", 2747 + "rand 0.8.5", 2748 + "sha1", 2749 + "thiserror 1.0.69", 2750 + "utf-8", 2751 + ] 2752 + 2753 + [[package]] 2666 2754 name = "typenum" 2667 2755 version = "1.18.0" 2668 2756 source = "registry+https://github.com/rust-lang/crates.io-index" ··· 2712 2800 "idna", 2713 2801 "percent-encoding", 2714 2802 ] 2803 + 2804 + [[package]] 2805 + name = "utf-8" 2806 + version = "0.7.6" 2807 + source = "registry+https://github.com/rust-lang/crates.io-index" 2808 + checksum = "09cc8ee72d2a9becf2f2febe0205bbed8fc6615b7cb429ad062dc7b7ddd036a9" 2715 2809 2716 2810 [[package]] 2717 2811 name = "utf8_iter"
+4 -2
Cargo.toml
··· 4 4 edition = "2024" 5 5 6 6 [dependencies] 7 + anyhow = "1.0" 7 8 atrium-api = "0.25.4" 8 9 atrium-common = "0.1.2" 9 10 atrium-identity = "0.1.5" ··· 11 12 atrium-xrpc = "0.12.3" 12 13 atrium-xrpc-client = { version = "0.5.14", features=["isahc"] } 13 14 clap = { version = "4.5.41", features = ["cargo"] } 14 - fuser = "0.15.1" 15 + fuser = { version = "0.15.1", features = ["abi-7-18"] } 15 16 futures = "0.3.31" 16 17 hickory-resolver = "0.25.2" 17 18 indexmap = "2.10.0" ··· 22 23 serde_ipld_dagcbor = "0.6.3" 23 24 serde_json = "1.0.141" 24 25 thiserror = "2.0.12" 25 - tokio = { version = "1.46.1", features = ["fs"] } 26 + tokio = { version = "1.46.1", features = ["fs", "sync", "rt-multi-thread"] } 27 + tokio-tungstenite = { version = "0.24", features = ["native-tls"] } 26 28 xdg = "3.0.0"
+96
flake.lock
··· 1 + { 2 + "nodes": { 3 + "flake-utils": { 4 + "inputs": { 5 + "systems": "systems" 6 + }, 7 + "locked": { 8 + "lastModified": 1731533236, 9 + "narHash": "sha256-l0KFg5HjrsfsO/JpG+r7fRrqm12kzFHyUHqHCVpMMbI=", 10 + "owner": "numtide", 11 + "repo": "flake-utils", 12 + "rev": "11707dc2f618dd54ca8739b309ec4fc024de578b", 13 + "type": "github" 14 + }, 15 + "original": { 16 + "owner": "numtide", 17 + "repo": "flake-utils", 18 + "type": "github" 19 + } 20 + }, 21 + "nixpkgs": { 22 + "locked": { 23 + "lastModified": 1757487488, 24 + "narHash": "sha256-zwE/e7CuPJUWKdvvTCB7iunV4E/+G0lKfv4kk/5Izdg=", 25 + "owner": "NixOS", 26 + "repo": "nixpkgs", 27 + "rev": "ab0f3607a6c7486ea22229b92ed2d355f1482ee0", 28 + "type": "github" 29 + }, 30 + "original": { 31 + "owner": "NixOS", 32 + "ref": "nixos-unstable", 33 + "repo": "nixpkgs", 34 + "type": "github" 35 + } 36 + }, 37 + "nixpkgs_2": { 38 + "locked": { 39 + "lastModified": 1744536153, 40 + "narHash": "sha256-awS2zRgF4uTwrOKwwiJcByDzDOdo3Q1rPZbiHQg/N38=", 41 + "owner": "NixOS", 42 + "repo": "nixpkgs", 43 + "rev": "18dd725c29603f582cf1900e0d25f9f1063dbf11", 44 + "type": "github" 45 + }, 46 + "original": { 47 + "owner": "NixOS", 48 + "ref": "nixpkgs-unstable", 49 + "repo": "nixpkgs", 50 + "type": "github" 51 + } 52 + }, 53 + "root": { 54 + "inputs": { 55 + "flake-utils": "flake-utils", 56 + "nixpkgs": "nixpkgs", 57 + "rust-overlay": "rust-overlay" 58 + } 59 + }, 60 + "rust-overlay": { 61 + "inputs": { 62 + "nixpkgs": "nixpkgs_2" 63 + }, 64 + "locked": { 65 + "lastModified": 1757644300, 66 + "narHash": "sha256-bVIDYz31bCdZId441sqgkImnA7aYr2UQzQlyl+O2DWc=", 67 + "owner": "oxalica", 68 + "repo": "rust-overlay", 69 + "rev": "591c5ae84f066bdfc9797b217df392d58eafd088", 70 + "type": "github" 71 + }, 72 + "original": { 73 + "owner": "oxalica", 74 + "repo": "rust-overlay", 75 + "type": "github" 76 + } 77 + }, 78 + "systems": { 79 + "locked": { 80 + "lastModified": 1681028828, 81 + "narHash": "sha256-Vy1rq5AaRuLzOxct8nz4T6wlgyUR7zLU309k9mBC768=", 82 + "owner": "nix-systems", 83 + "repo": "default", 84 + "rev": "da67096a3b9bf56a91d16901293e51ba5b49a27e", 85 + "type": "github" 86 + }, 87 + "original": { 88 + "owner": "nix-systems", 89 + "repo": "default", 90 + "type": "github" 91 + } 92 + } 93 + }, 94 + "root": "root", 95 + "version": 7 96 + }
+92
flake.nix
··· 1 + { 2 + description = "pdsfs - mount an atproto PDS repository as a FUSE filesystem"; 3 + 4 + inputs = { 5 + nixpkgs.url = "github:NixOS/nixpkgs/nixos-unstable"; 6 + rust-overlay.url = "github:oxalica/rust-overlay"; 7 + flake-utils.url = "github:numtide/flake-utils"; 8 + }; 9 + 10 + outputs = { self, nixpkgs, rust-overlay, flake-utils }: 11 + flake-utils.lib.eachDefaultSystem (system: 12 + let 13 + overlays = [ (import rust-overlay) ]; 14 + pkgs = import nixpkgs { 15 + inherit system overlays; 16 + }; 17 + 18 + rustToolchain = 19 + if builtins.pathExists ./rust-toolchain.toml 20 + then pkgs.rust-bin.fromRustupToolchainFile ./rust-toolchain.toml 21 + else pkgs.rust-bin.stable.latest.default.override { 22 + extensions = [ "rust-src" "rust-analyzer" ]; 23 + }; 24 + 25 + nativeBuildInputs = with pkgs; [ 26 + rustToolchain 27 + pkg-config 28 + ]; 29 + 30 + buildInputs = with pkgs; [ 31 + fuse3 32 + openssl 33 + ] ++ lib.optionals stdenv.isDarwin [ 34 + darwin.apple_sdk.frameworks.Security 35 + darwin.apple_sdk.frameworks.CoreServices 36 + ]; 37 + 38 + pdsfs = pkgs.rustPlatform.buildRustPackage { 39 + pname = "pdsfs"; 40 + version = "0.1.0"; 41 + 42 + src = ./.; 43 + 44 + cargoLock = { 45 + lockFile = ./Cargo.lock; 46 + }; 47 + 48 + inherit nativeBuildInputs buildInputs; 49 + 50 + # Skip tests that require network access or FUSE capabilities 51 + doCheck = false; 52 + 53 + meta = with pkgs.lib; { 54 + description = "Mount an atproto PDS repository as a FUSE filesystem"; 55 + homepage = "https://github.com/tangled/pdsfs"; # Update with actual repository URL 56 + license = licenses.mit; # Update with actual license 57 + maintainers = [ ]; # Add maintainers if desired 58 + platforms = platforms.linux ++ platforms.darwin; 59 + }; 60 + }; 61 + in 62 + { 63 + packages = { 64 + default = pdsfs; 65 + pdsfs = pdsfs; 66 + }; 67 + 68 + devShells.default = pkgs.mkShell { 69 + inherit buildInputs nativeBuildInputs; 70 + 71 + shellHook = '' 72 + echo "pdsfs development environment" 73 + echo "Run 'cargo build' to build the project" 74 + echo "Run 'cargo run -- <handle>' to mount a PDS repository" 75 + echo "" 76 + echo "FUSE development notes:" 77 + echo "- Ensure you have FUSE permissions (add user to 'fuse' group if needed)" 78 + echo "- Use 'fusermount -u <mountpoint>' to unmount if needed" 79 + ''; 80 + }; 81 + 82 + # Allow building with different Rust versions 83 + devShells.rust-nightly = pkgs.mkShell { 84 + buildInputs = buildInputs ++ [ 85 + (pkgs.rust-bin.nightly.latest.default.override { 86 + extensions = [ "rust-src" "rust-analyzer" ]; 87 + }) 88 + pkgs.pkg-config 89 + ]; 90 + }; 91 + }); 92 + }
+165 -6
readme.txt
··· 1 - pdsfs 2 - ----- 1 + pdsfs - mount an atproto PDS repository as a FUSE filesystem. 2 + 3 + a PDS repository[0] contains all data published by a user to 4 + the atmosphere[1]. it is exportable as a CAR 5 + (content-addressable archive) file. pdsfs is a tool that 6 + mounts this CAR file as a readonly-FUSE filesystem, allowing 7 + quick and easy exploration. 8 + 9 + to motivate the need for such a program, we could begin by 10 + mounting a repository: 11 + 12 + 13 + ฮป pdsfs oppi.li 14 + mounted at "mnt" 15 + hit enter to unmount and exit... 16 + 17 + 18 + oppi.li is my handle in the atmosphere. the tool does some 19 + hardwork to determine the location of my PDS repository 20 + given my handle, but that is not important. lets have a look 21 + around: 22 + 23 + 24 + ฮป ls mnt/ 25 + did:plc:qfpnj4og54vl56wngdriaxug/ 26 + 27 + 28 + the did:plc:stuff is my DID. lets dig deeper: 29 + 30 + 31 + ฮป ls mnt/did\:plc\:qfpnj4og54vl56wngdriaxug/ 32 + app.bsky.actor.profile/ place.stream.chat.message/ 33 + app.bsky.actor.status/ place.stream.chat.profile/ 34 + app.bsky.feed.generator/ place.stream.key/ 35 + app.bsky.feed.like/ place.stream.livestream/ 36 + app.bsky.feed.post/ sh.tangled.actor.profile/ 37 + app.bsky.feed.repost/ sh.tangled.feed.reaction/ 38 + app.bsky.graph.block/ sh.tangled.feed.star/ 39 + app.bsky.graph.follow/ sh.tangled.graph.follow/ 40 + app.rocksky.album/ sh.tangled.knot/ 41 + app.rocksky.artist/ sh.tangled.knot.member/ 42 + . 43 + . 44 + . 45 + 46 + 47 + we have some data from the repository now. these are 48 + "collections". if i want to publish a post to bluesky, i 49 + would write content to the "app.bsky.feed.post" collection 50 + in my PDS. this will then be indexed by a bluesky appview 51 + (bsky.app or zeppelin.social to name a few) and show up 52 + under my profile there. 53 + 54 + pdsfs is kind enough to deserialize the data stored (as 55 + CBOR) in the PDS repository to JSON: 56 + 57 + 58 + ฮป cat sh.tangled.repo/3ljidbevrjh22 | jq 59 + { 60 + "$type": "sh.tangled.repo", 61 + "addedAt": "2025-03-03T16:04:13Z", 62 + "knot": "knot1.tangled.sh", 63 + "name": "hello-world", 64 + "owner": "did:plc:3danwc67lo7obz2fmdg6jxcr" 65 + } 66 + 67 + 68 + thanks pdsfs! 69 + 70 + i publish my music listening habits to my PDS to the 71 + "app.rocksky.scrobble" collection, because rocksky[4] 72 + recognizes and indexes this collection. i have wired up my 73 + personal navidrome instance to write data of this form into 74 + my PDS everytime i listen to a track. here are my top 75 + artists in order: 76 + 77 + 78 + ฮป cat app.rocksky.scrobble/* | jq -r '.artist' | sort | uniq -c | sort -nr 79 + 117 Thank You Scientist 80 + 45 FKJ 81 + 34 Covet 82 + 33 VOLA 83 + 23 Sam Cooke 84 + 22 Dark Tranquillity 85 + 21 Piero Piccioni 86 + 12 Bloodywood 87 + 11 Frank Sinatra 88 + 10 Dream Theater 89 + 90 + 91 + it is true, i love sam cooke. 92 + 93 + pdsfs allows mounting multiple repositories at a time. allow 94 + me to introduce my friends: 95 + 96 + 97 + ฮป pdsfs icyphox.sh anil.recoil.org steveklabnik.com tangled.sh 98 + using cached CAR file for...did:plc:hwevmowznbiukdf6uk5dwrrq 99 + using cached CAR file for...did:plc:nhyitepp3u4u6fcfboegzcjw 100 + download complete for...did:plc:3danwc67lo7obz2fmdg6jxcr 101 + download complete for...did:plc:wshs7t2adsemcrrd4snkeqli 102 + mounted at "mnt" 103 + hit enter to unmount and exit... 104 + 105 + 106 + # -- in a separate shell -- 107 + 108 + 109 + ฮป cat ./mnt/*/app.bsky.actor.profile/* \ 110 + | jq -r '"\(.displayName)\n\(.description)\n---"' \ 111 + | sed '/^$/d' 112 + Steve Klabnik 113 + #rustlang, #jj-vcs, atproto, shitposts, urbanism. I 114 + contain multitudes. Working on #ruelang but just for 115 + fun. Currently in Austin, TX, but from Pittsburgh. 116 + Previously in Bushwick, the Mission, LA. 117 + --- 118 + Anirudh Oppiliappan 119 + building @tangled.sh โ€” code collaboration platform built 120 + on atproto helsinki, finland ยท https://anirudh.fi ยท 121 + (somewhat) effective altruist 122 + --- 123 + Anil Madhavapeddy 124 + Professor of Planetary Computing at the University of 125 + Cambridge @cst.cam.ac.uk, where I co-lead the 126 + @eeg.cl.cam.ac.uk, and am also to found at 127 + @conservation.cam.ac.uk. Homepage at 128 + https://anil.recoil.org 129 + --- 130 + Tangled 131 + https://tangled.sh is a git collaboration platform built 132 + on atproto. Social coding, but for real this time! 133 + Discord: chat.tangled.sh IRC: #tangled @ libera.chat 134 + Built by @oppi.li & @icyphox.sh 135 + --- 3 136 4 - mount an atproto PDS repository as a fuse3 filesystem. 5 137 138 + all my friends use tangled.sh, which requires them to 139 + publish their ssh public key to their PDii (PDSes?). perhaps 140 + i would like to add their keys to my allowed_signers file to 141 + verify their commit signatures: 6 142 7 - usage 8 - ----- 9 143 10 - cargo run -- <DID/handle> 144 + ฮป for dir in ./*/sh.tangled.publicKey; 145 + do cat $dir/$(ls -r $dir | head -n1) | jq -r '.key'; 146 + done | tee allowed_signers 147 + ssh-rsa AAAAB3NzaC1yc2EAAA...dHPqc= steveklabnik@DESKTOP-VV370NK 148 + ssh-ed25519 AAAAC3NzaC1lZD...g9bAdk icy@wyndle 149 + ssh-ed25519 AAAAC3NzaC1lZD...BqlM1u anil@recoil.org 150 + 151 + 152 + 153 + --- 154 + 155 + 156 + FUSE is quite liberating in that it allows you to represent 157 + anything as a filesystem. when applications like ls and cat 158 + are executed, the system calls to open and read are rerouted 159 + to your custom fs implementation (pdsfs in this case). the 160 + custom fs implementation is free to as it pleases, in fact 161 + the first iteration of pdsfs accessed the network for each 162 + open/read call to fetch live data from PDii. 163 + 164 + 165 + [0]: https://atproto.com/guides/data-repos 166 + [1]: https://atproto.com 167 + [2]: https://docs.bsky.app/docs/api/com-atproto-sync-get-repo 168 + [3]: https://tangled.sh/keys/oppi.li 169 + [4]: https://rocksky.app
+6
src/error.rs
··· 2 2 pub enum Error { 3 3 #[error("atproto error: {0}")] 4 4 GetRepo(#[from] atrium_xrpc::Error<atrium_api::com::atproto::sync::get_repo::Error>), 5 + #[error("repo build error: {0}")] 6 + Repo(#[from] atrium_repo::repo::Error), 7 + #[error("car store error: {0}")] 8 + Car(#[from] atrium_repo::blockstore::CarError), 9 + #[error("identity error: {0}")] 10 + Identity(#[from] atrium_identity::Error), 5 11 #[error("io error: {0}")] 6 12 Io(#[from] std::io::Error), 7 13 }
+318
src/firehose.rs
··· 1 + use anyhow::{anyhow, Result}; 2 + use atrium_api::com::atproto::sync::subscribe_repos::{Commit, NSID}; 3 + use atrium_api::client::AtpServiceClient; 4 + use atrium_api::com; 5 + use atrium_api::types; 6 + use atrium_xrpc_client::isahc::IsahcClient; 7 + use futures::StreamExt; 8 + use ipld_core::ipld::Ipld; 9 + use std::io::Cursor; 10 + use std::sync::{Arc, Mutex}; 11 + use tokio_tungstenite::connect_async; 12 + use tokio_tungstenite::tungstenite::Message; 13 + 14 + use crate::fs::{PdsFsCollection, PdsFsEntry, PdsFsRecord}; 15 + use indexmap::{IndexMap, IndexSet}; 16 + 17 + /// Frame header types for WebSocket messages 18 + #[derive(Debug, Clone, PartialEq, Eq)] 19 + enum FrameHeader { 20 + Message(Option<String>), 21 + Error, 22 + } 23 + 24 + impl TryFrom<Ipld> for FrameHeader { 25 + type Error = anyhow::Error; 26 + 27 + fn try_from(value: Ipld) -> Result<Self> { 28 + if let Ipld::Map(map) = value { 29 + if let Some(Ipld::Integer(i)) = map.get("op") { 30 + match i { 31 + 1 => { 32 + let t = if let Some(Ipld::String(s)) = map.get("t") { 33 + Some(s.clone()) 34 + } else { 35 + None 36 + }; 37 + return Ok(FrameHeader::Message(t)); 38 + } 39 + -1 => return Ok(FrameHeader::Error), 40 + _ => {} 41 + } 42 + } 43 + } 44 + Err(anyhow!("invalid frame type")) 45 + } 46 + } 47 + 48 + /// Frame types for parsed WebSocket messages 49 + #[derive(Debug, Clone, PartialEq, Eq)] 50 + pub enum Frame { 51 + Message(Option<String>, MessageFrame), 52 + Error(ErrorFrame), 53 + } 54 + 55 + #[derive(Debug, Clone, PartialEq, Eq)] 56 + pub struct MessageFrame { 57 + pub body: Vec<u8>, 58 + } 59 + 60 + #[derive(Debug, Clone, PartialEq, Eq)] 61 + pub struct ErrorFrame {} 62 + 63 + impl TryFrom<&[u8]> for Frame { 64 + type Error = anyhow::Error; 65 + 66 + fn try_from(value: &[u8]) -> Result<Self> { 67 + let mut cursor = Cursor::new(value); 68 + let (left, right) = match serde_ipld_dagcbor::from_reader::<Ipld, _>(&mut cursor) { 69 + Err(serde_ipld_dagcbor::DecodeError::TrailingData) => { 70 + value.split_at(cursor.position() as usize) 71 + } 72 + _ => { 73 + return Err(anyhow!("invalid frame type")); 74 + } 75 + }; 76 + let header = FrameHeader::try_from(serde_ipld_dagcbor::from_slice::<Ipld>(left)?)?; 77 + if let FrameHeader::Message(t) = &header { 78 + Ok(Frame::Message(t.clone(), MessageFrame { body: right.to_vec() })) 79 + } else { 80 + Ok(Frame::Error(ErrorFrame {})) 81 + } 82 + } 83 + } 84 + 85 + /// Subscribe to a repo's firehose and update inodes on changes 86 + pub async fn subscribe_to_repo<R>( 87 + did: String, 88 + pds: String, 89 + inodes: Arc<Mutex<IndexSet<PdsFsEntry>>>, 90 + sizes: Arc<Mutex<IndexMap<usize, u64>>>, 91 + content_cache: Arc<Mutex<IndexMap<String, String>>>, 92 + notifier: fuser::Notifier, 93 + ) -> Result<()> 94 + where 95 + R: atrium_repo::blockstore::AsyncBlockStoreRead, 96 + { 97 + // Strip https:// or http:// prefix from PDS URL if present 98 + let pds_host = pds.trim_start_matches("https://").trim_start_matches("http://"); 99 + let url = format!("wss://{}/xrpc/{}", pds_host, NSID); 100 + println!("Connecting to firehose: {}", url); 101 + 102 + let (mut stream, _) = connect_async(url).await?; 103 + println!("Connected to firehose for {}", did); 104 + 105 + loop { 106 + match stream.next().await { 107 + Some(Ok(Message::Binary(data))) => { 108 + if let Ok(Frame::Message(Some(t), msg)) = Frame::try_from(data.as_slice()) { 109 + if t.as_str() == "#commit" { 110 + if let Ok(commit) = serde_ipld_dagcbor::from_reader::<Commit, _>(msg.body.as_slice()) { 111 + // Only process commits for our DID 112 + if commit.repo.as_str() == did { 113 + if let Err(e) = handle_commit(&commit, &inodes, &sizes, &content_cache, &did, &pds, &notifier).await { 114 + eprintln!("Error handling commit: {:?}", e); 115 + } 116 + } 117 + } 118 + } 119 + } 120 + } 121 + Some(Ok(_)) => {} // Ignore other message types 122 + Some(Err(e)) => { 123 + eprintln!("WebSocket error: {}", e); 124 + break; 125 + } 126 + None => { 127 + eprintln!("WebSocket closed"); 128 + break; 129 + } 130 + } 131 + } 132 + 133 + Ok(()) 134 + } 135 + 136 + /// Handle a commit by updating the inode tree and notifying Finder 137 + async fn handle_commit( 138 + commit: &Commit, 139 + inodes: &Arc<Mutex<IndexSet<PdsFsEntry>>>, 140 + sizes: &Arc<Mutex<IndexMap<usize, u64>>>, 141 + content_cache: &Arc<Mutex<IndexMap<String, String>>>, 142 + did: &str, 143 + pds: &str, 144 + notifier: &fuser::Notifier, 145 + ) -> Result<()> { 146 + // Find the DID inode 147 + let did_entry = PdsFsEntry::Did(did.to_string()); 148 + let did_inode = { 149 + let inodes_lock = inodes.lock().unwrap(); 150 + inodes_lock.get_index_of(&did_entry) 151 + }; 152 + 153 + let Some(did_inode) = did_inode else { 154 + return Err(anyhow!("DID not found in inodes")); 155 + }; 156 + 157 + for op in &commit.ops { 158 + let Some((collection, rkey)) = op.path.split_once('/') else { 159 + continue; 160 + }; 161 + 162 + match op.action.as_str() { 163 + "create" => { 164 + // Fetch the record from PDS 165 + let record_key = format!("{}/{}", collection, rkey); 166 + let cache_key = format!("{}/{}", did, record_key); 167 + 168 + // Fetch record content from PDS 169 + match fetch_record(pds, did, collection, rkey).await { 170 + Ok(content) => { 171 + let content_len = content.len() as u64; 172 + 173 + // Add the record to inodes 174 + let (collection_inode, record_inode) = { 175 + let mut inodes_lock = inodes.lock().unwrap(); 176 + 177 + // Ensure collection exists 178 + let collection_entry = PdsFsEntry::Collection(PdsFsCollection { 179 + parent: did_inode, 180 + nsid: collection.to_string(), 181 + }); 182 + let (collection_inode, _) = inodes_lock.insert_full(collection_entry); 183 + 184 + // Add the record 185 + let record_entry = PdsFsEntry::Record(PdsFsRecord { 186 + parent: collection_inode, 187 + rkey: rkey.to_string(), 188 + }); 189 + let (record_inode, _) = inodes_lock.insert_full(record_entry); 190 + (collection_inode, record_inode) 191 + }; 192 + 193 + // Cache the content and size 194 + content_cache.lock().unwrap().insert(cache_key, content); 195 + sizes.lock().unwrap().insert(record_inode, content_len); 196 + 197 + // Notify Finder about the new file (release lock first) 198 + let filename = format!("{}.json", rkey); 199 + if let Err(e) = notifier.inval_entry(collection_inode as u64, filename.as_ref()) { 200 + eprintln!("Failed to invalidate entry for {}: {}", filename, e); 201 + } 202 + 203 + println!("Created: {}/{}", collection, rkey); 204 + } 205 + Err(e) => { 206 + eprintln!("Failed to fetch record {}/{}: {}", collection, rkey, e); 207 + } 208 + } 209 + } 210 + "delete" => { 211 + // Get inodes before removing 212 + let (collection_inode_opt, child_inode_opt) = { 213 + let mut inodes_lock = inodes.lock().unwrap(); 214 + 215 + // Find the collection 216 + let collection_entry = PdsFsEntry::Collection(PdsFsCollection { 217 + parent: did_inode, 218 + nsid: collection.to_string(), 219 + }); 220 + let collection_inode = inodes_lock.get_index_of(&collection_entry); 221 + 222 + // Find and remove the record 223 + let child_inode = if let Some(coll_ino) = collection_inode { 224 + let record_entry = PdsFsEntry::Record(PdsFsRecord { 225 + parent: coll_ino, 226 + rkey: rkey.to_string(), 227 + }); 228 + let child_ino = inodes_lock.get_index_of(&record_entry); 229 + inodes_lock.shift_remove(&record_entry); 230 + child_ino 231 + } else { 232 + None 233 + }; 234 + 235 + (collection_inode, child_inode) 236 + }; 237 + 238 + // Notify Finder about the deletion (release lock first) 239 + if let (Some(coll_ino), Some(child_ino)) = (collection_inode_opt, child_inode_opt) { 240 + // Remove from caches 241 + sizes.lock().unwrap().shift_remove(&child_ino); 242 + let cache_key = format!("{}/{}/{}", did, collection, rkey); 243 + content_cache.lock().unwrap().shift_remove(&cache_key); 244 + 245 + let filename = format!("{}.json", rkey); 246 + if let Err(e) = notifier.delete(coll_ino as u64, child_ino as u64, filename.as_ref()) { 247 + eprintln!("Failed to notify deletion for {}: {}", filename, e); 248 + } 249 + } 250 + 251 + println!("Deleted: {}/{}", collection, rkey); 252 + } 253 + "update" => { 254 + // For updates, invalidate the inode so content is re-fetched 255 + let record_inode_opt = { 256 + let inodes_lock = inodes.lock().unwrap(); 257 + let collection_entry = PdsFsEntry::Collection(PdsFsCollection { 258 + parent: did_inode, 259 + nsid: collection.to_string(), 260 + }); 261 + 262 + if let Some(collection_inode) = inodes_lock.get_index_of(&collection_entry) { 263 + let record_entry = PdsFsEntry::Record(PdsFsRecord { 264 + parent: collection_inode, 265 + rkey: rkey.to_string(), 266 + }); 267 + inodes_lock.get_index_of(&record_entry) 268 + } else { 269 + None 270 + } 271 + }; 272 + 273 + // Notify Finder to invalidate the inode (release lock first) 274 + if let Some(record_ino) = record_inode_opt { 275 + // Clear caches so content is recalculated 276 + sizes.lock().unwrap().shift_remove(&record_ino); 277 + let cache_key = format!("{}/{}/{}", did, collection, rkey); 278 + content_cache.lock().unwrap().shift_remove(&cache_key); 279 + 280 + // Invalidate the entire inode (metadata and all data) 281 + if let Err(e) = notifier.inval_inode(record_ino as u64, 0, 0) { 282 + eprintln!("Failed to invalidate inode for {}/{}: {}", collection, rkey, e); 283 + } 284 + } 285 + 286 + println!("Updated: {}/{}", collection, rkey); 287 + } 288 + _ => {} 289 + } 290 + } 291 + 292 + Ok(()) 293 + } 294 + 295 + /// Fetch a record from the PDS 296 + async fn fetch_record(pds: &str, did: &str, collection: &str, rkey: &str) -> Result<String> { 297 + let client = AtpServiceClient::new(IsahcClient::new(pds)); 298 + let did = types::string::Did::new(did.to_string()).map_err(|e| anyhow!(e))?; 299 + let collection_nsid = types::string::Nsid::new(collection.to_string()).map_err(|e| anyhow!(e))?; 300 + let record_key = types::string::RecordKey::new(rkey.to_string()).map_err(|e| anyhow!(e))?; 301 + 302 + let response = client 303 + .service 304 + .com 305 + .atproto 306 + .repo 307 + .get_record(com::atproto::repo::get_record::Parameters::from( 308 + com::atproto::repo::get_record::ParametersData { 309 + cid: None, 310 + collection: collection_nsid, 311 + repo: types::string::AtIdentifier::Did(did), 312 + rkey: record_key, 313 + } 314 + )) 315 + .await?; 316 + 317 + Ok(serde_json::to_string_pretty(&response.value)?) 318 + }
+282 -93
src/fs.rs
··· 1 - use std::{collections::BTreeMap, time}; 1 + use std::sync::{Arc, Mutex}; 2 + use std::time::{self, SystemTime, UNIX_EPOCH, Duration}; 2 3 3 4 use atrium_repo::{Repository, blockstore::AsyncBlockStoreRead}; 4 5 use futures::StreamExt; 5 - use indexmap::IndexSet; 6 + use indexmap::{IndexMap, IndexSet}; 7 + 8 + type Inode = usize; 9 + 10 + /// Decode a TID (timestamp identifier) to get the timestamp in microseconds since Unix epoch 11 + fn tid_to_timestamp(tid: &str) -> Option<SystemTime> { 12 + const S32_CHAR: &[u8] = b"234567abcdefghijklmnopqrstuvwxyz"; 13 + 14 + if tid.len() != 13 { 15 + return None; 16 + } 17 + 18 + let mut value: u64 = 0; 19 + for ch in tid.chars() { 20 + let pos = S32_CHAR.iter().position(|&c| c as char == ch)?; 21 + // Big-endian: first character is most significant 22 + value = (value << 5) | (pos as u64); 23 + } 24 + 25 + // Extract timestamp from upper bits (shifted by 10) 26 + let micros = value >> 10; 27 + 28 + UNIX_EPOCH.checked_add(Duration::from_micros(micros)) 29 + } 30 + 6 31 pub struct PdsFs<R> { 7 - repo: Repository<R>, 8 - inodes: IndexSet<PdsFsEntry>, 32 + repos: Arc<Mutex<IndexMap<String, Repository<R>>>>, 33 + inodes: Arc<Mutex<IndexSet<PdsFsEntry>>>, 34 + sizes: Arc<Mutex<IndexMap<Inode, u64>>>, 35 + content_cache: Arc<Mutex<IndexMap<String, String>>>, 36 + rt: tokio::runtime::Runtime, 9 37 } 10 38 11 39 #[derive(Debug, Clone, PartialEq, Eq, Hash)] 12 40 pub enum PdsFsEntry { 13 41 Zero, 14 42 Root, 15 - Collection(String), 43 + Did(String), 44 + Collection(PdsFsCollection), 16 45 Record(PdsFsRecord), 17 46 } 18 47 48 + impl PdsFsEntry { 49 + fn as_collection(&self) -> Option<&PdsFsCollection> { 50 + match &self { 51 + Self::Collection(c) => Some(c), 52 + _ => None, 53 + } 54 + } 55 + 56 + fn as_did(&self) -> Option<&String> { 57 + match &self { 58 + Self::Did(d) => Some(d), 59 + _ => None, 60 + } 61 + } 62 + 63 + fn unwrap_collection(&self) -> &PdsFsCollection { 64 + self.as_collection().unwrap() 65 + } 66 + 67 + fn unwrap_did(&self) -> &String { 68 + self.as_did().unwrap() 69 + } 70 + } 71 + 72 + #[derive(Debug, Clone, PartialEq, Eq, Hash)] 73 + pub struct PdsFsCollection { 74 + pub parent: Inode, 75 + pub nsid: String, 76 + } 77 + 19 78 #[derive(Debug, Clone, PartialEq, Eq, Hash)] 20 79 pub struct PdsFsRecord { 21 - collection: String, 22 - rkey: String, 80 + pub parent: Inode, 81 + pub rkey: String, 23 82 } 24 83 25 - impl PdsFsRecord { 26 - fn key(&self) -> String { 27 - format!("{}/{}", self.collection, self.rkey) 28 - } 29 - } 84 + // impl PdsFsRecord { 85 + // fn key(&self) -> String { 86 + // format!("{}/{}", self.collection, self.rkey) 87 + // } 88 + // } 30 89 31 90 const TTL: time::Duration = time::Duration::from_secs(300); 32 91 const BLKSIZE: u32 = 512; ··· 53 112 where 54 113 R: AsyncBlockStoreRead, 55 114 { 56 - pub async fn new(mut repo: Repository<R>) -> Self { 115 + pub fn new() -> Self { 116 + PdsFs { 117 + repos: Arc::new(Mutex::new(Default::default())), 118 + inodes: Arc::new(Mutex::new(IndexSet::from([PdsFsEntry::Zero, PdsFsEntry::Root]))), 119 + sizes: Arc::new(Mutex::new(Default::default())), 120 + content_cache: Arc::new(Mutex::new(Default::default())), 121 + rt: tokio::runtime::Runtime::new().unwrap(), 122 + } 123 + } 124 + 125 + pub fn get_shared_state(&self) -> (Arc<Mutex<IndexMap<String, Repository<R>>>>, Arc<Mutex<IndexSet<PdsFsEntry>>>, Arc<Mutex<IndexMap<Inode, u64>>>, Arc<Mutex<IndexMap<String, String>>>) { 126 + (Arc::clone(&self.repos), Arc::clone(&self.inodes), Arc::clone(&self.sizes), Arc::clone(&self.content_cache)) 127 + } 128 + 129 + pub async fn add(&mut self, did: String, mut repo: Repository<R>) { 57 130 let mut mst = repo.tree(); 58 131 59 - // collect all keys and group by collection 60 - let mut keys = Box::pin(mst.keys()); 61 - let mut collections: BTreeMap<String, Vec<PdsFsRecord>> = BTreeMap::new(); 132 + let did_inode = { 133 + let mut inodes = self.inodes.lock().unwrap(); 134 + let (did_inode, _) = inodes.insert_full(PdsFsEntry::Did(did.clone())); 135 + did_inode 136 + }; 62 137 138 + let mut keys = Box::pin(mst.keys()); 63 139 while let Some(Ok(key)) = keys.next().await { 64 - if let Some((collection, rkey)) = key.split_once("/") { 65 - let record = PdsFsRecord { 66 - collection: collection.to_owned(), 140 + if let Some((collection_name, rkey)) = key.split_once("/") { 141 + let mut inodes = self.inodes.lock().unwrap(); 142 + let (collection_inode, _) = inodes.insert_full(PdsFsEntry::Collection(PdsFsCollection { 143 + parent: did_inode, 144 + nsid: collection_name.to_owned(), 145 + })); 146 + 147 + inodes.insert(PdsFsEntry::Record(PdsFsRecord { 148 + parent: collection_inode, 67 149 rkey: rkey.to_owned(), 68 - }; 69 - 70 - collections 71 - .entry(collection.to_owned()) 72 - .or_insert_with(Vec::new) 73 - .push(record); 150 + })); 74 151 } 75 152 } 76 153 77 154 drop(keys); 78 155 drop(mst); 79 156 80 - // build inode structure with proper ordering 81 - let mut inodes = IndexSet::from([PdsFsEntry::Zero, PdsFsEntry::Root]); 82 - 83 - // add collections first 84 - for n in collections.keys().cloned() { 85 - inodes.insert(PdsFsEntry::Collection(n)); 86 - } 87 - 88 - // then add all records grouped by collection 89 - for r in collections.values().flatten().cloned() { 90 - inodes.insert(PdsFsEntry::Record(r)); 91 - } 92 - 93 - println!("constructed {} inodes", inodes.len()); 94 - 95 - PdsFs { repo, inodes } 157 + self.repos.lock().unwrap().insert(did, repo); 96 158 } 97 159 98 160 fn attr(&mut self, ino: u64) -> fuser::FileAttr { 99 - match self.inodes.get_index(ino as usize) { 161 + let inodes = self.inodes.lock().unwrap(); 162 + match inodes.get_index(ino as usize) { 100 163 Some(PdsFsEntry::Root) => ROOTDIR_ATTR, 101 164 Some(PdsFsEntry::Collection(_)) => fuser::FileAttr { 102 165 ino, ··· 115 178 flags: 0, 116 179 blksize: BLKSIZE, 117 180 }, 181 + Some(PdsFsEntry::Did(_)) => fuser::FileAttr { 182 + ino, 183 + size: 0, 184 + blocks: 0, 185 + atime: time::UNIX_EPOCH, 186 + mtime: time::UNIX_EPOCH, 187 + ctime: time::UNIX_EPOCH, 188 + crtime: time::UNIX_EPOCH, 189 + kind: fuser::FileType::Directory, 190 + perm: 0o755, 191 + nlink: 2, 192 + uid: 1000, 193 + gid: 1000, 194 + rdev: 0, 195 + flags: 0, 196 + blksize: BLKSIZE, 197 + }, 118 198 Some(PdsFsEntry::Record(r)) => { 119 - let rt = tokio::runtime::Runtime::new().unwrap(); 120 - let size = rt 121 - .block_on(self.repo.get_raw::<ipld_core::ipld::Ipld>(&r.key())) 122 - .ok() 123 - .flatten() 124 - .map_or(0, |v| serde_json::to_string(&v).unwrap().len()) 125 - as u64; 199 + let col = inodes[r.parent].unwrap_collection(); 200 + let did = inodes[col.parent].unwrap_did().clone(); 201 + let rkey = r.rkey.clone(); 202 + let collection_nsid = col.nsid.clone(); 203 + drop(inodes); 204 + 205 + // Check cache first 206 + let size = { 207 + let sizes = self.sizes.lock().unwrap(); 208 + if let Some(&cached_size) = sizes.get(&(ino as usize)) { 209 + cached_size 210 + } else { 211 + drop(sizes); 212 + // Not in cache, try to fetch from repo 213 + let mut repos = self.repos.lock().unwrap(); 214 + let repo = &mut repos[&did]; 215 + let key = format!("{}/{}", collection_nsid, rkey); 216 + let size = self 217 + .rt 218 + .block_on(repo.get_raw::<ipld_core::ipld::Ipld>(&key)) 219 + .ok() 220 + .flatten() 221 + .map_or(500, |v| serde_json::to_string_pretty(&v).unwrap().len()) 222 + as u64; 223 + // Cache it for next time 224 + self.sizes.lock().unwrap().insert(ino as usize, size); 225 + size 226 + } 227 + }; 126 228 let blocks = ((size as u32 + BLKSIZE - 1) / BLKSIZE) as u64; 229 + 230 + // Decode TID to get creation timestamp 231 + let timestamp = tid_to_timestamp(&rkey).unwrap_or(time::UNIX_EPOCH); 232 + 127 233 fuser::FileAttr { 128 234 ino, 129 235 size, 130 236 blocks, 131 - atime: time::UNIX_EPOCH, 132 - mtime: time::UNIX_EPOCH, 133 - ctime: time::UNIX_EPOCH, 134 - crtime: time::UNIX_EPOCH, 237 + atime: timestamp, 238 + mtime: timestamp, 239 + ctime: timestamp, 240 + crtime: timestamp, 135 241 kind: fuser::FileType::RegularFile, 136 242 perm: 0o644, 137 243 nlink: 1, ··· 139 245 gid: 20, 140 246 rdev: 0, 141 247 flags: 0, 142 - blksize: 512, 248 + blksize: BLKSIZE, 143 249 } 144 250 } 145 251 _ => panic!("zero"), ··· 158 264 _fh: Option<u64>, 159 265 reply: fuser::ReplyAttr, 160 266 ) { 161 - if (ino as usize) < self.inodes.len() { 267 + let len = self.inodes.lock().unwrap().len(); 268 + if (ino as usize) < len { 162 269 reply.attr(&TTL, &self.attr(ino as u64)) 163 270 } else { 164 271 reply.error(libc::ENOENT) ··· 173 280 offset: i64, 174 281 mut reply: fuser::ReplyDirectory, 175 282 ) { 176 - match self.inodes.get_index(ino as usize) { 283 + let inodes = self.inodes.lock().unwrap(); 284 + match inodes.get_index(ino as usize) { 177 285 Some(PdsFsEntry::Root) => { 178 286 let entries: Vec<_> = vec![(ino, ".".to_string()), (ino, "..".to_string())] 179 287 .into_iter() 180 - .chain(self.inodes.iter().enumerate().filter_map(|(i, e)| { 181 - if let PdsFsEntry::Collection(name) = e { 182 - Some((i as u64, name.clone())) 288 + .chain(inodes.iter().enumerate().filter_map(|(i, e)| { 289 + if let PdsFsEntry::Did(did) = e { 290 + Some((i as u64, did.clone())) 183 291 } else { 184 292 None 185 293 } 186 294 })) 187 295 .collect(); 296 + drop(inodes); 188 297 189 298 for (index, (inode_num, name)) in 190 299 entries.into_iter().enumerate().skip(offset as usize) ··· 200 309 } 201 310 reply.ok() 202 311 } 203 - Some(PdsFsEntry::Collection(collection_name)) => { 204 - let entries: Vec<_> = vec![ 205 - (ino, ".".to_string()), 206 - (1, "..".to_string()), // Parent is root (inode 1) 207 - ] 208 - .into_iter() 209 - .chain(self.inodes.iter().enumerate().filter_map(|(i, e)| { 210 - if let PdsFsEntry::Record(record) = e { 211 - if record.collection == *collection_name { 212 - Some((i as u64, record.rkey.clone())) 312 + Some(PdsFsEntry::Did(_)) => { 313 + let entries: Vec<_> = vec![(ino, ".".to_string()), (1, "..".to_string())] 314 + .into_iter() 315 + .chain(inodes.iter().enumerate().filter_map(|(i, e)| { 316 + if let PdsFsEntry::Collection(col) = e { 317 + if col.parent == ino as usize { 318 + Some((i as u64, col.nsid.clone())) 319 + } else { 320 + None 321 + } 213 322 } else { 214 323 None 215 324 } 216 - } else { 217 - None 325 + })) 326 + .collect(); 327 + drop(inodes); 328 + 329 + for (index, (inode_num, name)) in entries.into_iter().enumerate().skip(offset as usize) { 330 + let full = reply.add( 331 + inode_num, 332 + (index + 1) as i64, 333 + if name.starts_with('.') { 334 + fuser::FileType::Directory 335 + } else { 336 + fuser::FileType::RegularFile 337 + }, 338 + name, 339 + ); 340 + if full { 341 + break; 218 342 } 219 - })) 220 - .collect(); 343 + } 344 + 345 + reply.ok(); 346 + } 347 + Some(PdsFsEntry::Collection(c)) => { 348 + let parent_ino = c.parent; 349 + let entries: Vec<_> = [(ino, ".".to_string()), (parent_ino as u64, "..".to_string())] 350 + .into_iter() 351 + .chain(inodes.iter().enumerate().filter_map(|(i, e)| { 352 + if let PdsFsEntry::Record(record) = e { 353 + if record.parent == ino as usize { 354 + Some((i as u64, format!("{}.json", record.rkey))) 355 + } else { 356 + None 357 + } 358 + } else { 359 + None 360 + } 361 + })) 362 + .collect(); 363 + drop(inodes); 221 364 222 - for (index, (inode_num, name)) in 223 - entries.into_iter().enumerate().skip(offset as usize) 224 - { 225 - if reply.add( 365 + for (index, (inode_num, name)) in entries.into_iter().enumerate().skip(offset as usize) { 366 + let full = reply.add( 226 367 inode_num, 227 368 (index + 1) as i64, 228 369 if name.starts_with('.') { ··· 231 372 fuser::FileType::RegularFile 232 373 }, 233 374 name, 234 - ) { 375 + ); 376 + if full { 235 377 break; 236 378 } 237 379 } 380 + 238 381 reply.ok() 239 382 } 240 - _ => reply.error(libc::ENOENT), 383 + _ => { 384 + drop(inodes); 385 + reply.error(libc::ENOENT) 386 + } 241 387 } 242 388 } 243 389 ··· 248 394 name: &std::ffi::OsStr, 249 395 reply: fuser::ReplyEntry, 250 396 ) { 251 - match self.inodes.get_index(parent as usize) { 397 + let inodes = self.inodes.lock().unwrap(); 398 + match inodes.get_index(parent as usize) { 252 399 Some(PdsFsEntry::Root) => { 253 - let collection = PdsFsEntry::Collection(name.to_string_lossy().to_string()); 254 - if let Some(ino) = self.inodes.get_index_of(&collection) { 400 + let did = PdsFsEntry::Did(name.to_string_lossy().to_string()); 401 + if let Some(ino) = inodes.get_index_of(&did) { 402 + drop(inodes); 403 + reply.entry(&TTL, &self.attr(ino as u64), 0); 404 + } else { 405 + drop(inodes); 406 + reply.error(libc::ENOENT) 407 + } 408 + } 409 + Some(PdsFsEntry::Did(_)) => { 410 + let col = PdsFsEntry::Collection(PdsFsCollection { 411 + parent: parent as usize, 412 + nsid: name.to_string_lossy().to_string(), 413 + }); 414 + if let Some(ino) = inodes.get_index_of(&col) { 415 + drop(inodes); 255 416 reply.entry(&TTL, &self.attr(ino as u64), 0); 256 417 } else { 418 + drop(inodes); 257 419 reply.error(libc::ENOENT) 258 420 } 259 421 } 260 - Some(PdsFsEntry::Collection(c)) => { 422 + Some(PdsFsEntry::Collection(_)) => { 423 + let name_str = name.to_string_lossy(); 424 + let rkey = name_str.strip_suffix(".json").unwrap_or(&name_str).to_string(); 261 425 let record = PdsFsEntry::Record(PdsFsRecord { 262 - collection: c.to_owned(), 263 - rkey: name.to_string_lossy().to_string(), 426 + parent: parent as usize, 427 + rkey, 264 428 }); 265 - if let Some(ino) = self.inodes.get_index_of(&record) { 429 + if let Some(ino) = inodes.get_index_of(&record) { 430 + drop(inodes); 266 431 reply.entry(&TTL, &self.attr(ino as u64), 0); 267 432 } else { 433 + drop(inodes); 268 434 reply.error(libc::ENOENT) 269 435 } 270 436 } 271 - _ => reply.error(libc::ENOENT), 437 + _ => { 438 + drop(inodes); 439 + reply.error(libc::ENOENT) 440 + } 272 441 } 273 442 } 274 443 ··· 283 452 _lock: Option<u64>, 284 453 reply: fuser::ReplyData, 285 454 ) { 286 - let rt = tokio::runtime::Runtime::new().unwrap(); 287 - if let Some(PdsFsEntry::Record(r)) = self.inodes.get_index(ino as usize) 288 - && let Ok(Some(val)) = rt.block_on(self.repo.get_raw::<ipld_core::ipld::Ipld>(&r.key())) 289 - { 290 - reply.data(&serde_json::to_string(&val).unwrap().as_bytes()[offset as usize..]); 455 + let inodes = self.inodes.lock().unwrap(); 456 + if let Some(PdsFsEntry::Record(r)) = inodes.get_index(ino as usize) { 457 + let col = inodes[r.parent].unwrap_collection(); 458 + let did = inodes[col.parent].unwrap_did().clone(); 459 + let key = format!("{}/{}", col.nsid, r.rkey); 460 + let cache_key = format!("{}/{}", did, key); 461 + drop(inodes); 462 + 463 + // Check content cache first (for new records from firehose) 464 + { 465 + let cache = self.content_cache.lock().unwrap(); 466 + if let Some(content) = cache.get(&cache_key) { 467 + reply.data(&content.as_bytes()[offset as usize..]); 468 + return; 469 + } 470 + } 471 + 472 + // Fall back to repo 473 + let mut repos = self.repos.lock().unwrap(); 474 + let repo = &mut repos[&did]; 475 + if let Ok(Some(val)) = self.rt.block_on(repo.get_raw::<ipld_core::ipld::Ipld>(&key)) { 476 + reply.data(&serde_json::to_string(&val).unwrap().as_bytes()[offset as usize..]); 477 + return; 478 + } 291 479 } else { 292 - reply.error(libc::ENOENT); 480 + drop(inodes); 293 481 } 482 + reply.error(libc::ENOENT); 294 483 } 295 484 }
+113 -50
src/main.rs
··· 1 - #![feature(let_chains)] 2 1 mod client; 3 2 mod error; 3 + mod firehose; 4 4 mod fs; 5 5 mod resolver; 6 6 ··· 10 10 use atrium_repo::{Repository, blockstore::CarStore}; 11 11 use atrium_xrpc_client::isahc::IsahcClient; 12 12 use fuser::MountOption; 13 - use indicatif::{ProgressBar, ProgressStyle}; 13 + use futures::{StreamExt, stream}; 14 + use indicatif::{MultiProgress, ProgressBar, ProgressStyle}; 14 15 use std::{ 15 16 io::{Cursor, Write}, 16 17 path::PathBuf, 18 + sync::Arc, 17 19 }; 18 - use xdg::BaseDirectories; 19 20 20 21 fn main() { 21 22 let rt = tokio::runtime::Runtime::new().unwrap(); 22 23 let matches = clap::command!() 23 - .arg(clap::Arg::new("handle").index(1)) 24 + .arg( 25 + clap::Arg::new("handles") 26 + .index(1) 27 + .required(true) 28 + .num_args(1..) 29 + .help("One or more handles to download and mount"), 30 + ) 24 31 .arg( 25 32 clap::Arg::new("mountpoint") 26 33 .short('m') ··· 28 35 .value_parser(clap::value_parser!(PathBuf)), 29 36 ) 30 37 .get_matches(); 31 - let handle = matches.get_one::<String>("handle").unwrap(); 38 + let handles = matches 39 + .get_many::<String>("handles") 40 + .unwrap() 41 + .cloned() 42 + .collect::<Vec<_>>(); 32 43 let mountpoint = matches 33 44 .get_one::<PathBuf>("mountpoint") 34 45 .map(ToOwned::to_owned) 35 - .unwrap_or(PathBuf::from(&handle)); 46 + .unwrap_or(PathBuf::from("mnt")); 36 47 let _ = std::fs::create_dir_all(&mountpoint); 37 - let resolver = resolver::id_resolver(); 38 - let id = rt.block_on(async { 39 - resolver 40 - .resolve(&atrium_api::types::string::Handle::new(handle.into()).unwrap()) 41 - .await 42 - .unwrap() 43 - }); 44 48 45 - let pb = ProgressBar::new_spinner(); 46 - pb.set_style( 47 - ProgressStyle::default_spinner() 48 - .template("{spinner:.green} [{elapsed_precise}] {msg}") 49 - .unwrap() 50 - .tick_strings(&["โ ‹", "โ ™", "โ น", "โ ธ", "โ ผ", "โ ด", "โ ฆ", "โ ง", "โ ‡", "โ "]), 49 + let resolver = Arc::new(resolver::id_resolver()); 50 + let bars = Arc::new(MultiProgress::new()); 51 + let repos = rt.block_on( 52 + stream::iter(handles) 53 + .then(|handle| { 54 + let h = handle.clone(); 55 + let r = Arc::clone(&resolver); 56 + let b = Arc::clone(&bars); 57 + async move { 58 + let id = r.resolve(&h).await?; 59 + let bytes = cached_download(&id, &b).await?; 60 + let repo = build_repo(bytes).await?; 61 + Ok::<_, error::Error>((id.did.clone(), id.pds.clone(), repo)) 62 + } 63 + }) 64 + .collect::<Vec<_>>(), 51 65 ); 66 + let (success, errors): (Vec<_>, Vec<_>) = repos.into_iter().partition(|r| r.is_ok()); 67 + for e in errors { 68 + eprintln!("{:?}", e.as_ref().unwrap_err()); 69 + } 70 + let repos_with_pds: Vec<_> = success 71 + .into_iter() 72 + .map(|s| s.unwrap()) 73 + .collect(); 52 74 53 - let bytes = rt.block_on(cached_download(&id, &pb)).unwrap(); 54 - let store = rt.block_on(async { CarStore::open(Cursor::new(bytes)).await.unwrap() }); 55 - let root = store.roots().next().unwrap(); 56 - let repo = rt.block_on(async { Repository::open(store, root).await.unwrap() }); 75 + // construct the fs 76 + let mut fs = fs::PdsFs::new(); 57 77 58 - // construct the fs 59 - let fs = rt.block_on(fs::PdsFs::new(repo)); 78 + // Extract (did, pds) pairs for WebSocket tasks before consuming repos 79 + let did_pds_pairs: Vec<_> = repos_with_pds.iter() 80 + .map(|(did, pds, _)| (did.clone(), pds.clone())) 81 + .collect(); 82 + 83 + // Consume repos_with_pds to add repos to filesystem 84 + for (did, _, repo) in repos_with_pds { 85 + rt.block_on(fs.add(did, repo)) 86 + } 87 + 88 + // get shared state for WebSocket tasks 89 + let (_repos_arc, inodes_arc, sizes_arc, content_cache_arc) = fs.get_shared_state(); 60 90 61 91 // mount 62 - let options = vec![MountOption::RO, MountOption::FSName("pdsfs".to_string())]; 63 - let join_handle = fuser::spawn_mount2(fs, &mountpoint, &options).unwrap(); 92 + let options = vec![ 93 + MountOption::RO, 94 + MountOption::FSName("pdsfs".to_string()), 95 + MountOption::AllowOther, 96 + MountOption::CUSTOM("local".to_string()), 97 + MountOption::CUSTOM("volname=pdsfs".to_string()), 98 + ]; 99 + 100 + // Create session and get notifier for Finder refresh 101 + let session = fuser::Session::new(fs, &mountpoint, &options).unwrap(); 102 + let notifier = session.notifier(); 103 + let _bg = session.spawn().unwrap(); 104 + 105 + // spawn WebSocket subscription tasks for each DID using the runtime handle 106 + let rt_handle = rt.handle().clone(); 107 + for (did, pds) in did_pds_pairs { 108 + let inodes_clone = Arc::clone(&inodes_arc); 109 + let sizes_clone = Arc::clone(&sizes_arc); 110 + let content_cache_clone = Arc::clone(&content_cache_arc); 111 + let notifier_clone = notifier.clone(); 112 + 113 + rt_handle.spawn(async move { 114 + if let Err(e) = firehose::subscribe_to_repo::<atrium_repo::blockstore::CarStore<std::io::Cursor<Vec<u8>>>>( 115 + did, 116 + pds, 117 + inodes_clone, 118 + sizes_clone, 119 + content_cache_clone, 120 + notifier_clone, 121 + ).await { 122 + eprintln!("WebSocket error: {:?}", e); 123 + } 124 + }); 125 + } 64 126 65 127 println!("mounted at {mountpoint:?}"); 66 128 print!("hit enter to unmount and exit..."); ··· 70 132 let mut input = String::new(); 71 133 std::io::stdin().read_line(&mut input).unwrap(); 72 134 73 - join_handle.join(); 74 - std::fs::remove_dir(&mountpoint).unwrap(); 75 - 76 135 println!("unmounted {mountpoint:?}"); 77 136 } 78 137 79 - async fn cached_download(id: &ResolvedIdentity, pb: &ProgressBar) -> Result<Vec<u8>, error::Error> { 80 - let dirs = BaseDirectories::new(); 138 + async fn cached_download( 139 + id: &ResolvedIdentity, 140 + m: &MultiProgress, 141 + ) -> Result<Vec<u8>, error::Error> { 142 + let mut pb = ProgressBar::new_spinner(); 143 + pb.set_style( 144 + ProgressStyle::default_spinner() 145 + .template("{spinner:.green} [{elapsed_precise}] {msg}") 146 + .unwrap() 147 + .tick_strings(&["โ ‹", "โ ™", "โ น", "โ ธ", "โ ผ", "โ ด", "โ ฆ", "โ ง", "โ ‡", "โ "]), 148 + ); 149 + pb.enable_steady_tick(std::time::Duration::from_millis(100)); 150 + pb = m.add(pb); 81 151 82 - let dir = dirs.get_data_home().expect("$HOME is absent").join("pdsfs"); 83 - tokio::fs::create_dir_all(&dir).await?; 152 + // Always download fresh - no caching for now to ensure up-to-date data 153 + pb.set_message(format!("downloading CAR file for...{}", id.did)); 154 + let bytes = download_car_file(id, &pb).await?; 84 155 85 - let file = dir.join(&id.did); 86 - let exists = std::fs::exists(&file)?; 87 - 88 - let bytes = if !exists { 89 - pb.set_message(format!("downloading CAR file for\t...\t{}", id.did)); 90 - download_car_file(id, pb).await? 91 - } else { 92 - pb.set_message(format!("using cached CAR file for\t...\t{}", id.did)); 93 - tokio::fs::read(file).await? 94 - }; 95 - 96 - pb.set_message(format!( 97 - "received {} bytes for \t...\t{}", 98 - bytes.len(), 99 - id.did 100 - )); 156 + pb.finish(); 101 157 Ok(bytes) 102 158 } 103 159 ··· 123 179 124 180 Ok(bytes) 125 181 } 182 + 183 + async fn build_repo(bytes: Vec<u8>) -> Result<Repository<CarStore<Cursor<Vec<u8>>>>, error::Error> { 184 + let store = CarStore::open(Cursor::new(bytes)).await?; 185 + let root = store.roots().next().unwrap(); 186 + let repo = Repository::open(store, root).await?; 187 + Ok(repo) 188 + }