+22
.tangled/workflows/build.yml
+22
.tangled/workflows/build.yml
···
1
+
when:
2
+
- event: push
3
+
branch: main
4
+
5
+
dependencies:
6
+
nixpkgs:
7
+
- gcc
8
+
- cargo
9
+
- rustc
10
+
- rustfmt
11
+
- pkg-config
12
+
- fuse3
13
+
- openssl
14
+
15
+
steps:
16
+
- name: build
17
+
command: |
18
+
export PKG_CONFIG_PATH="${PKG_CONFIG_PATH:+$PKG_CONFIG_PATH:}$(nix build nixpkgs#openssl.dev --no-link --print-out-paths)/lib/pkgconfig"
19
+
export PKG_CONFIG_PATH="${PKG_CONFIG_PATH}:$(nix build nixpkgs#fuse3.dev --no-link --print-out-paths)/lib/pkgconfig"
20
+
cargo build --all --verbose
21
+
22
+
+17
.tangled/workflows/fmt.yml
+17
.tangled/workflows/fmt.yml
+101
-7
Cargo.lock
+101
-7
Cargo.lock
···
98
98
]
99
99
100
100
[[package]]
101
+
name = "anyhow"
102
+
version = "1.0.100"
103
+
source = "registry+https://github.com/rust-lang/crates.io-index"
104
+
checksum = "a23eb6b1614318a8071c9b2521f36b424b2c83db5eb3a0fead4a6c0809af6e61"
105
+
106
+
[[package]]
101
107
name = "async-channel"
102
108
version = "1.9.0"
103
109
source = "registry+https://github.com/rust-lang/crates.io-index"
···
327
333
version = "3.19.0"
328
334
source = "registry+https://github.com/rust-lang/crates.io-index"
329
335
checksum = "46c5e41b57b8bba42a04676d81cb89e9ee8e859a1a66f80a5a72e1cb76b34d43"
336
+
337
+
[[package]]
338
+
name = "byteorder"
339
+
version = "1.5.0"
340
+
source = "registry+https://github.com/rust-lang/crates.io-index"
341
+
checksum = "1fd0f2584146f6f2ef48085050886acf353beff7305ebd1ae69500e27c67f64b"
330
342
331
343
[[package]]
332
344
name = "bytes"
···
669
681
checksum = "778e2ac28f6c47af28e4907f13ffd1e1ddbd400980a9abd7c8df189bf578a5ad"
670
682
dependencies = [
671
683
"libc",
672
-
"windows-sys 0.59.0",
684
+
"windows-sys 0.60.2",
673
685
]
674
686
675
687
[[package]]
···
991
1003
"idna",
992
1004
"ipnet",
993
1005
"once_cell",
994
-
"rand",
1006
+
"rand 0.9.2",
995
1007
"ring",
996
1008
"thiserror 2.0.12",
997
1009
"tinyvec",
···
1013
1025
"moka",
1014
1026
"once_cell",
1015
1027
"parking_lot",
1016
-
"rand",
1028
+
"rand 0.9.2",
1017
1029
"resolv-conf",
1018
1030
"smallvec",
1019
1031
"thiserror 2.0.12",
···
1757
1769
name = "pdsfs"
1758
1770
version = "0.1.0"
1759
1771
dependencies = [
1772
+
"anyhow",
1760
1773
"atrium-api",
1761
1774
"atrium-common",
1762
1775
"atrium-identity",
···
1776
1789
"serde_json",
1777
1790
"thiserror 2.0.12",
1778
1791
"tokio",
1792
+
"tokio-tungstenite",
1779
1793
"xdg",
1780
1794
]
1781
1795
···
1889
1903
1890
1904
[[package]]
1891
1905
name = "rand"
1906
+
version = "0.8.5"
1907
+
source = "registry+https://github.com/rust-lang/crates.io-index"
1908
+
checksum = "34af8d1a0e25924bc5b7c43c079c942339d8f0a8b57c39049bef581b46327404"
1909
+
dependencies = [
1910
+
"libc",
1911
+
"rand_chacha 0.3.1",
1912
+
"rand_core 0.6.4",
1913
+
]
1914
+
1915
+
[[package]]
1916
+
name = "rand"
1892
1917
version = "0.9.2"
1893
1918
source = "registry+https://github.com/rust-lang/crates.io-index"
1894
1919
checksum = "6db2770f06117d490610c7488547d543617b21bfa07796d7a12f6f1bd53850d1"
1895
1920
dependencies = [
1896
-
"rand_chacha",
1897
-
"rand_core",
1921
+
"rand_chacha 0.9.0",
1922
+
"rand_core 0.9.3",
1923
+
]
1924
+
1925
+
[[package]]
1926
+
name = "rand_chacha"
1927
+
version = "0.3.1"
1928
+
source = "registry+https://github.com/rust-lang/crates.io-index"
1929
+
checksum = "e6c10a63a0fa32252be49d21e7709d4d4baf8d231c2dbce1eaa8141b9b127d88"
1930
+
dependencies = [
1931
+
"ppv-lite86",
1932
+
"rand_core 0.6.4",
1898
1933
]
1899
1934
1900
1935
[[package]]
···
1904
1939
checksum = "d3022b5f1df60f26e1ffddd6c66e8aa15de382ae63b3a0c1bfc0e4d3e3f325cb"
1905
1940
dependencies = [
1906
1941
"ppv-lite86",
1907
-
"rand_core",
1942
+
"rand_core 0.9.3",
1943
+
]
1944
+
1945
+
[[package]]
1946
+
name = "rand_core"
1947
+
version = "0.6.4"
1948
+
source = "registry+https://github.com/rust-lang/crates.io-index"
1949
+
checksum = "ec0be4795e2f6a28069bec0b5ff3e2ac9bafc99e6a9a7dc3547996c5c816922c"
1950
+
dependencies = [
1951
+
"getrandom 0.2.16",
1908
1952
]
1909
1953
1910
1954
[[package]]
···
2057
2101
"errno",
2058
2102
"libc",
2059
2103
"linux-raw-sys",
2060
-
"windows-sys 0.59.0",
2104
+
"windows-sys 0.60.2",
2061
2105
]
2062
2106
2063
2107
[[package]]
···
2234
2278
]
2235
2279
2236
2280
[[package]]
2281
+
name = "sha1"
2282
+
version = "0.10.6"
2283
+
source = "registry+https://github.com/rust-lang/crates.io-index"
2284
+
checksum = "e3bf829a2d51ab4a5ddf1352d8470c140cadc8301b2ae1789db023f01cedd6ba"
2285
+
dependencies = [
2286
+
"cfg-if",
2287
+
"cpufeatures",
2288
+
"digest",
2289
+
]
2290
+
2291
+
[[package]]
2237
2292
name = "sha2"
2238
2293
version = "0.10.9"
2239
2294
source = "registry+https://github.com/rust-lang/crates.io-index"
···
2515
2570
]
2516
2571
2517
2572
[[package]]
2573
+
name = "tokio-tungstenite"
2574
+
version = "0.24.0"
2575
+
source = "registry+https://github.com/rust-lang/crates.io-index"
2576
+
checksum = "edc5f74e248dc973e0dbb7b74c7e0d6fcc301c694ff50049504004ef4d0cdcd9"
2577
+
dependencies = [
2578
+
"futures-util",
2579
+
"log",
2580
+
"native-tls",
2581
+
"tokio",
2582
+
"tokio-native-tls",
2583
+
"tungstenite",
2584
+
]
2585
+
2586
+
[[package]]
2518
2587
name = "tokio-util"
2519
2588
version = "0.7.15"
2520
2589
source = "registry+https://github.com/rust-lang/crates.io-index"
···
2663
2732
checksum = "e421abadd41a4225275504ea4d6566923418b7f05506fbc9c0fe86ba7396114b"
2664
2733
2665
2734
[[package]]
2735
+
name = "tungstenite"
2736
+
version = "0.24.0"
2737
+
source = "registry+https://github.com/rust-lang/crates.io-index"
2738
+
checksum = "18e5b8366ee7a95b16d32197d0b2604b43a0be89dc5fac9f8e96ccafbaedda8a"
2739
+
dependencies = [
2740
+
"byteorder",
2741
+
"bytes",
2742
+
"data-encoding",
2743
+
"http 1.3.1",
2744
+
"httparse",
2745
+
"log",
2746
+
"native-tls",
2747
+
"rand 0.8.5",
2748
+
"sha1",
2749
+
"thiserror 1.0.69",
2750
+
"utf-8",
2751
+
]
2752
+
2753
+
[[package]]
2666
2754
name = "typenum"
2667
2755
version = "1.18.0"
2668
2756
source = "registry+https://github.com/rust-lang/crates.io-index"
···
2712
2800
"idna",
2713
2801
"percent-encoding",
2714
2802
]
2803
+
2804
+
[[package]]
2805
+
name = "utf-8"
2806
+
version = "0.7.6"
2807
+
source = "registry+https://github.com/rust-lang/crates.io-index"
2808
+
checksum = "09cc8ee72d2a9becf2f2febe0205bbed8fc6615b7cb429ad062dc7b7ddd036a9"
2715
2809
2716
2810
[[package]]
2717
2811
name = "utf8_iter"
+4
-2
Cargo.toml
+4
-2
Cargo.toml
···
4
4
edition = "2024"
5
5
6
6
[dependencies]
7
+
anyhow = "1.0"
7
8
atrium-api = "0.25.4"
8
9
atrium-common = "0.1.2"
9
10
atrium-identity = "0.1.5"
···
11
12
atrium-xrpc = "0.12.3"
12
13
atrium-xrpc-client = { version = "0.5.14", features=["isahc"] }
13
14
clap = { version = "4.5.41", features = ["cargo"] }
14
-
fuser = "0.15.1"
15
+
fuser = { version = "0.15.1", features = ["abi-7-18"] }
15
16
futures = "0.3.31"
16
17
hickory-resolver = "0.25.2"
17
18
indexmap = "2.10.0"
···
22
23
serde_ipld_dagcbor = "0.6.3"
23
24
serde_json = "1.0.141"
24
25
thiserror = "2.0.12"
25
-
tokio = { version = "1.46.1", features = ["fs"] }
26
+
tokio = { version = "1.46.1", features = ["fs", "sync", "rt-multi-thread"] }
27
+
tokio-tungstenite = { version = "0.24", features = ["native-tls"] }
26
28
xdg = "3.0.0"
+96
flake.lock
+96
flake.lock
···
1
+
{
2
+
"nodes": {
3
+
"flake-utils": {
4
+
"inputs": {
5
+
"systems": "systems"
6
+
},
7
+
"locked": {
8
+
"lastModified": 1731533236,
9
+
"narHash": "sha256-l0KFg5HjrsfsO/JpG+r7fRrqm12kzFHyUHqHCVpMMbI=",
10
+
"owner": "numtide",
11
+
"repo": "flake-utils",
12
+
"rev": "11707dc2f618dd54ca8739b309ec4fc024de578b",
13
+
"type": "github"
14
+
},
15
+
"original": {
16
+
"owner": "numtide",
17
+
"repo": "flake-utils",
18
+
"type": "github"
19
+
}
20
+
},
21
+
"nixpkgs": {
22
+
"locked": {
23
+
"lastModified": 1757487488,
24
+
"narHash": "sha256-zwE/e7CuPJUWKdvvTCB7iunV4E/+G0lKfv4kk/5Izdg=",
25
+
"owner": "NixOS",
26
+
"repo": "nixpkgs",
27
+
"rev": "ab0f3607a6c7486ea22229b92ed2d355f1482ee0",
28
+
"type": "github"
29
+
},
30
+
"original": {
31
+
"owner": "NixOS",
32
+
"ref": "nixos-unstable",
33
+
"repo": "nixpkgs",
34
+
"type": "github"
35
+
}
36
+
},
37
+
"nixpkgs_2": {
38
+
"locked": {
39
+
"lastModified": 1744536153,
40
+
"narHash": "sha256-awS2zRgF4uTwrOKwwiJcByDzDOdo3Q1rPZbiHQg/N38=",
41
+
"owner": "NixOS",
42
+
"repo": "nixpkgs",
43
+
"rev": "18dd725c29603f582cf1900e0d25f9f1063dbf11",
44
+
"type": "github"
45
+
},
46
+
"original": {
47
+
"owner": "NixOS",
48
+
"ref": "nixpkgs-unstable",
49
+
"repo": "nixpkgs",
50
+
"type": "github"
51
+
}
52
+
},
53
+
"root": {
54
+
"inputs": {
55
+
"flake-utils": "flake-utils",
56
+
"nixpkgs": "nixpkgs",
57
+
"rust-overlay": "rust-overlay"
58
+
}
59
+
},
60
+
"rust-overlay": {
61
+
"inputs": {
62
+
"nixpkgs": "nixpkgs_2"
63
+
},
64
+
"locked": {
65
+
"lastModified": 1757644300,
66
+
"narHash": "sha256-bVIDYz31bCdZId441sqgkImnA7aYr2UQzQlyl+O2DWc=",
67
+
"owner": "oxalica",
68
+
"repo": "rust-overlay",
69
+
"rev": "591c5ae84f066bdfc9797b217df392d58eafd088",
70
+
"type": "github"
71
+
},
72
+
"original": {
73
+
"owner": "oxalica",
74
+
"repo": "rust-overlay",
75
+
"type": "github"
76
+
}
77
+
},
78
+
"systems": {
79
+
"locked": {
80
+
"lastModified": 1681028828,
81
+
"narHash": "sha256-Vy1rq5AaRuLzOxct8nz4T6wlgyUR7zLU309k9mBC768=",
82
+
"owner": "nix-systems",
83
+
"repo": "default",
84
+
"rev": "da67096a3b9bf56a91d16901293e51ba5b49a27e",
85
+
"type": "github"
86
+
},
87
+
"original": {
88
+
"owner": "nix-systems",
89
+
"repo": "default",
90
+
"type": "github"
91
+
}
92
+
}
93
+
},
94
+
"root": "root",
95
+
"version": 7
96
+
}
+92
flake.nix
+92
flake.nix
···
1
+
{
2
+
description = "pdsfs - mount an atproto PDS repository as a FUSE filesystem";
3
+
4
+
inputs = {
5
+
nixpkgs.url = "github:NixOS/nixpkgs/nixos-unstable";
6
+
rust-overlay.url = "github:oxalica/rust-overlay";
7
+
flake-utils.url = "github:numtide/flake-utils";
8
+
};
9
+
10
+
outputs = { self, nixpkgs, rust-overlay, flake-utils }:
11
+
flake-utils.lib.eachDefaultSystem (system:
12
+
let
13
+
overlays = [ (import rust-overlay) ];
14
+
pkgs = import nixpkgs {
15
+
inherit system overlays;
16
+
};
17
+
18
+
rustToolchain =
19
+
if builtins.pathExists ./rust-toolchain.toml
20
+
then pkgs.rust-bin.fromRustupToolchainFile ./rust-toolchain.toml
21
+
else pkgs.rust-bin.stable.latest.default.override {
22
+
extensions = [ "rust-src" "rust-analyzer" ];
23
+
};
24
+
25
+
nativeBuildInputs = with pkgs; [
26
+
rustToolchain
27
+
pkg-config
28
+
];
29
+
30
+
buildInputs = with pkgs; [
31
+
fuse3
32
+
openssl
33
+
] ++ lib.optionals stdenv.isDarwin [
34
+
darwin.apple_sdk.frameworks.Security
35
+
darwin.apple_sdk.frameworks.CoreServices
36
+
];
37
+
38
+
pdsfs = pkgs.rustPlatform.buildRustPackage {
39
+
pname = "pdsfs";
40
+
version = "0.1.0";
41
+
42
+
src = ./.;
43
+
44
+
cargoLock = {
45
+
lockFile = ./Cargo.lock;
46
+
};
47
+
48
+
inherit nativeBuildInputs buildInputs;
49
+
50
+
# Skip tests that require network access or FUSE capabilities
51
+
doCheck = false;
52
+
53
+
meta = with pkgs.lib; {
54
+
description = "Mount an atproto PDS repository as a FUSE filesystem";
55
+
homepage = "https://github.com/tangled/pdsfs"; # Update with actual repository URL
56
+
license = licenses.mit; # Update with actual license
57
+
maintainers = [ ]; # Add maintainers if desired
58
+
platforms = platforms.linux ++ platforms.darwin;
59
+
};
60
+
};
61
+
in
62
+
{
63
+
packages = {
64
+
default = pdsfs;
65
+
pdsfs = pdsfs;
66
+
};
67
+
68
+
devShells.default = pkgs.mkShell {
69
+
inherit buildInputs nativeBuildInputs;
70
+
71
+
shellHook = ''
72
+
echo "pdsfs development environment"
73
+
echo "Run 'cargo build' to build the project"
74
+
echo "Run 'cargo run -- <handle>' to mount a PDS repository"
75
+
echo ""
76
+
echo "FUSE development notes:"
77
+
echo "- Ensure you have FUSE permissions (add user to 'fuse' group if needed)"
78
+
echo "- Use 'fusermount -u <mountpoint>' to unmount if needed"
79
+
'';
80
+
};
81
+
82
+
# Allow building with different Rust versions
83
+
devShells.rust-nightly = pkgs.mkShell {
84
+
buildInputs = buildInputs ++ [
85
+
(pkgs.rust-bin.nightly.latest.default.override {
86
+
extensions = [ "rust-src" "rust-analyzer" ];
87
+
})
88
+
pkgs.pkg-config
89
+
];
90
+
};
91
+
});
92
+
}
+165
-6
readme.txt
+165
-6
readme.txt
···
1
-
pdsfs
2
-
-----
1
+
pdsfs - mount an atproto PDS repository as a FUSE filesystem.
2
+
3
+
a PDS repository[0] contains all data published by a user to
4
+
the atmosphere[1]. it is exportable as a CAR
5
+
(content-addressable archive) file. pdsfs is a tool that
6
+
mounts this CAR file as a readonly-FUSE filesystem, allowing
7
+
quick and easy exploration.
8
+
9
+
to motivate the need for such a program, we could begin by
10
+
mounting a repository:
11
+
12
+
13
+
ฮป pdsfs oppi.li
14
+
mounted at "mnt"
15
+
hit enter to unmount and exit...
16
+
17
+
18
+
oppi.li is my handle in the atmosphere. the tool does some
19
+
hardwork to determine the location of my PDS repository
20
+
given my handle, but that is not important. lets have a look
21
+
around:
22
+
23
+
24
+
ฮป ls mnt/
25
+
did:plc:qfpnj4og54vl56wngdriaxug/
26
+
27
+
28
+
the did:plc:stuff is my DID. lets dig deeper:
29
+
30
+
31
+
ฮป ls mnt/did\:plc\:qfpnj4og54vl56wngdriaxug/
32
+
app.bsky.actor.profile/ place.stream.chat.message/
33
+
app.bsky.actor.status/ place.stream.chat.profile/
34
+
app.bsky.feed.generator/ place.stream.key/
35
+
app.bsky.feed.like/ place.stream.livestream/
36
+
app.bsky.feed.post/ sh.tangled.actor.profile/
37
+
app.bsky.feed.repost/ sh.tangled.feed.reaction/
38
+
app.bsky.graph.block/ sh.tangled.feed.star/
39
+
app.bsky.graph.follow/ sh.tangled.graph.follow/
40
+
app.rocksky.album/ sh.tangled.knot/
41
+
app.rocksky.artist/ sh.tangled.knot.member/
42
+
.
43
+
.
44
+
.
45
+
46
+
47
+
we have some data from the repository now. these are
48
+
"collections". if i want to publish a post to bluesky, i
49
+
would write content to the "app.bsky.feed.post" collection
50
+
in my PDS. this will then be indexed by a bluesky appview
51
+
(bsky.app or zeppelin.social to name a few) and show up
52
+
under my profile there.
53
+
54
+
pdsfs is kind enough to deserialize the data stored (as
55
+
CBOR) in the PDS repository to JSON:
56
+
57
+
58
+
ฮป cat sh.tangled.repo/3ljidbevrjh22 | jq
59
+
{
60
+
"$type": "sh.tangled.repo",
61
+
"addedAt": "2025-03-03T16:04:13Z",
62
+
"knot": "knot1.tangled.sh",
63
+
"name": "hello-world",
64
+
"owner": "did:plc:3danwc67lo7obz2fmdg6jxcr"
65
+
}
66
+
67
+
68
+
thanks pdsfs!
69
+
70
+
i publish my music listening habits to my PDS to the
71
+
"app.rocksky.scrobble" collection, because rocksky[4]
72
+
recognizes and indexes this collection. i have wired up my
73
+
personal navidrome instance to write data of this form into
74
+
my PDS everytime i listen to a track. here are my top
75
+
artists in order:
76
+
77
+
78
+
ฮป cat app.rocksky.scrobble/* | jq -r '.artist' | sort | uniq -c | sort -nr
79
+
117 Thank You Scientist
80
+
45 FKJ
81
+
34 Covet
82
+
33 VOLA
83
+
23 Sam Cooke
84
+
22 Dark Tranquillity
85
+
21 Piero Piccioni
86
+
12 Bloodywood
87
+
11 Frank Sinatra
88
+
10 Dream Theater
89
+
90
+
91
+
it is true, i love sam cooke.
92
+
93
+
pdsfs allows mounting multiple repositories at a time. allow
94
+
me to introduce my friends:
95
+
96
+
97
+
ฮป pdsfs icyphox.sh anil.recoil.org steveklabnik.com tangled.sh
98
+
using cached CAR file for...did:plc:hwevmowznbiukdf6uk5dwrrq
99
+
using cached CAR file for...did:plc:nhyitepp3u4u6fcfboegzcjw
100
+
download complete for...did:plc:3danwc67lo7obz2fmdg6jxcr
101
+
download complete for...did:plc:wshs7t2adsemcrrd4snkeqli
102
+
mounted at "mnt"
103
+
hit enter to unmount and exit...
104
+
105
+
106
+
# -- in a separate shell --
107
+
108
+
109
+
ฮป cat ./mnt/*/app.bsky.actor.profile/* \
110
+
| jq -r '"\(.displayName)\n\(.description)\n---"' \
111
+
| sed '/^$/d'
112
+
Steve Klabnik
113
+
#rustlang, #jj-vcs, atproto, shitposts, urbanism. I
114
+
contain multitudes. Working on #ruelang but just for
115
+
fun. Currently in Austin, TX, but from Pittsburgh.
116
+
Previously in Bushwick, the Mission, LA.
117
+
---
118
+
Anirudh Oppiliappan
119
+
building @tangled.sh โ code collaboration platform built
120
+
on atproto helsinki, finland ยท https://anirudh.fi ยท
121
+
(somewhat) effective altruist
122
+
---
123
+
Anil Madhavapeddy
124
+
Professor of Planetary Computing at the University of
125
+
Cambridge @cst.cam.ac.uk, where I co-lead the
126
+
@eeg.cl.cam.ac.uk, and am also to found at
127
+
@conservation.cam.ac.uk. Homepage at
128
+
https://anil.recoil.org
129
+
---
130
+
Tangled
131
+
https://tangled.sh is a git collaboration platform built
132
+
on atproto. Social coding, but for real this time!
133
+
Discord: chat.tangled.sh IRC: #tangled @ libera.chat
134
+
Built by @oppi.li & @icyphox.sh
135
+
---
3
136
4
-
mount an atproto PDS repository as a fuse3 filesystem.
5
137
138
+
all my friends use tangled.sh, which requires them to
139
+
publish their ssh public key to their PDii (PDSes?). perhaps
140
+
i would like to add their keys to my allowed_signers file to
141
+
verify their commit signatures:
6
142
7
-
usage
8
-
-----
9
143
10
-
cargo run -- <DID/handle>
144
+
ฮป for dir in ./*/sh.tangled.publicKey;
145
+
do cat $dir/$(ls -r $dir | head -n1) | jq -r '.key';
146
+
done | tee allowed_signers
147
+
ssh-rsa AAAAB3NzaC1yc2EAAA...dHPqc= steveklabnik@DESKTOP-VV370NK
148
+
ssh-ed25519 AAAAC3NzaC1lZD...g9bAdk icy@wyndle
149
+
ssh-ed25519 AAAAC3NzaC1lZD...BqlM1u anil@recoil.org
150
+
151
+
152
+
153
+
---
154
+
155
+
156
+
FUSE is quite liberating in that it allows you to represent
157
+
anything as a filesystem. when applications like ls and cat
158
+
are executed, the system calls to open and read are rerouted
159
+
to your custom fs implementation (pdsfs in this case). the
160
+
custom fs implementation is free to as it pleases, in fact
161
+
the first iteration of pdsfs accessed the network for each
162
+
open/read call to fetch live data from PDii.
163
+
164
+
165
+
[0]: https://atproto.com/guides/data-repos
166
+
[1]: https://atproto.com
167
+
[2]: https://docs.bsky.app/docs/api/com-atproto-sync-get-repo
168
+
[3]: https://tangled.sh/keys/oppi.li
169
+
[4]: https://rocksky.app
+6
src/error.rs
+6
src/error.rs
···
2
2
pub enum Error {
3
3
#[error("atproto error: {0}")]
4
4
GetRepo(#[from] atrium_xrpc::Error<atrium_api::com::atproto::sync::get_repo::Error>),
5
+
#[error("repo build error: {0}")]
6
+
Repo(#[from] atrium_repo::repo::Error),
7
+
#[error("car store error: {0}")]
8
+
Car(#[from] atrium_repo::blockstore::CarError),
9
+
#[error("identity error: {0}")]
10
+
Identity(#[from] atrium_identity::Error),
5
11
#[error("io error: {0}")]
6
12
Io(#[from] std::io::Error),
7
13
}
+318
src/firehose.rs
+318
src/firehose.rs
···
1
+
use anyhow::{anyhow, Result};
2
+
use atrium_api::com::atproto::sync::subscribe_repos::{Commit, NSID};
3
+
use atrium_api::client::AtpServiceClient;
4
+
use atrium_api::com;
5
+
use atrium_api::types;
6
+
use atrium_xrpc_client::isahc::IsahcClient;
7
+
use futures::StreamExt;
8
+
use ipld_core::ipld::Ipld;
9
+
use std::io::Cursor;
10
+
use std::sync::{Arc, Mutex};
11
+
use tokio_tungstenite::connect_async;
12
+
use tokio_tungstenite::tungstenite::Message;
13
+
14
+
use crate::fs::{PdsFsCollection, PdsFsEntry, PdsFsRecord};
15
+
use indexmap::{IndexMap, IndexSet};
16
+
17
+
/// Frame header types for WebSocket messages
18
+
#[derive(Debug, Clone, PartialEq, Eq)]
19
+
enum FrameHeader {
20
+
Message(Option<String>),
21
+
Error,
22
+
}
23
+
24
+
impl TryFrom<Ipld> for FrameHeader {
25
+
type Error = anyhow::Error;
26
+
27
+
fn try_from(value: Ipld) -> Result<Self> {
28
+
if let Ipld::Map(map) = value {
29
+
if let Some(Ipld::Integer(i)) = map.get("op") {
30
+
match i {
31
+
1 => {
32
+
let t = if let Some(Ipld::String(s)) = map.get("t") {
33
+
Some(s.clone())
34
+
} else {
35
+
None
36
+
};
37
+
return Ok(FrameHeader::Message(t));
38
+
}
39
+
-1 => return Ok(FrameHeader::Error),
40
+
_ => {}
41
+
}
42
+
}
43
+
}
44
+
Err(anyhow!("invalid frame type"))
45
+
}
46
+
}
47
+
48
+
/// Frame types for parsed WebSocket messages
49
+
#[derive(Debug, Clone, PartialEq, Eq)]
50
+
pub enum Frame {
51
+
Message(Option<String>, MessageFrame),
52
+
Error(ErrorFrame),
53
+
}
54
+
55
+
#[derive(Debug, Clone, PartialEq, Eq)]
56
+
pub struct MessageFrame {
57
+
pub body: Vec<u8>,
58
+
}
59
+
60
+
#[derive(Debug, Clone, PartialEq, Eq)]
61
+
pub struct ErrorFrame {}
62
+
63
+
impl TryFrom<&[u8]> for Frame {
64
+
type Error = anyhow::Error;
65
+
66
+
fn try_from(value: &[u8]) -> Result<Self> {
67
+
let mut cursor = Cursor::new(value);
68
+
let (left, right) = match serde_ipld_dagcbor::from_reader::<Ipld, _>(&mut cursor) {
69
+
Err(serde_ipld_dagcbor::DecodeError::TrailingData) => {
70
+
value.split_at(cursor.position() as usize)
71
+
}
72
+
_ => {
73
+
return Err(anyhow!("invalid frame type"));
74
+
}
75
+
};
76
+
let header = FrameHeader::try_from(serde_ipld_dagcbor::from_slice::<Ipld>(left)?)?;
77
+
if let FrameHeader::Message(t) = &header {
78
+
Ok(Frame::Message(t.clone(), MessageFrame { body: right.to_vec() }))
79
+
} else {
80
+
Ok(Frame::Error(ErrorFrame {}))
81
+
}
82
+
}
83
+
}
84
+
85
+
/// Subscribe to a repo's firehose and update inodes on changes
86
+
pub async fn subscribe_to_repo<R>(
87
+
did: String,
88
+
pds: String,
89
+
inodes: Arc<Mutex<IndexSet<PdsFsEntry>>>,
90
+
sizes: Arc<Mutex<IndexMap<usize, u64>>>,
91
+
content_cache: Arc<Mutex<IndexMap<String, String>>>,
92
+
notifier: fuser::Notifier,
93
+
) -> Result<()>
94
+
where
95
+
R: atrium_repo::blockstore::AsyncBlockStoreRead,
96
+
{
97
+
// Strip https:// or http:// prefix from PDS URL if present
98
+
let pds_host = pds.trim_start_matches("https://").trim_start_matches("http://");
99
+
let url = format!("wss://{}/xrpc/{}", pds_host, NSID);
100
+
println!("Connecting to firehose: {}", url);
101
+
102
+
let (mut stream, _) = connect_async(url).await?;
103
+
println!("Connected to firehose for {}", did);
104
+
105
+
loop {
106
+
match stream.next().await {
107
+
Some(Ok(Message::Binary(data))) => {
108
+
if let Ok(Frame::Message(Some(t), msg)) = Frame::try_from(data.as_slice()) {
109
+
if t.as_str() == "#commit" {
110
+
if let Ok(commit) = serde_ipld_dagcbor::from_reader::<Commit, _>(msg.body.as_slice()) {
111
+
// Only process commits for our DID
112
+
if commit.repo.as_str() == did {
113
+
if let Err(e) = handle_commit(&commit, &inodes, &sizes, &content_cache, &did, &pds, ¬ifier).await {
114
+
eprintln!("Error handling commit: {:?}", e);
115
+
}
116
+
}
117
+
}
118
+
}
119
+
}
120
+
}
121
+
Some(Ok(_)) => {} // Ignore other message types
122
+
Some(Err(e)) => {
123
+
eprintln!("WebSocket error: {}", e);
124
+
break;
125
+
}
126
+
None => {
127
+
eprintln!("WebSocket closed");
128
+
break;
129
+
}
130
+
}
131
+
}
132
+
133
+
Ok(())
134
+
}
135
+
136
+
/// Handle a commit by updating the inode tree and notifying Finder
137
+
async fn handle_commit(
138
+
commit: &Commit,
139
+
inodes: &Arc<Mutex<IndexSet<PdsFsEntry>>>,
140
+
sizes: &Arc<Mutex<IndexMap<usize, u64>>>,
141
+
content_cache: &Arc<Mutex<IndexMap<String, String>>>,
142
+
did: &str,
143
+
pds: &str,
144
+
notifier: &fuser::Notifier,
145
+
) -> Result<()> {
146
+
// Find the DID inode
147
+
let did_entry = PdsFsEntry::Did(did.to_string());
148
+
let did_inode = {
149
+
let inodes_lock = inodes.lock().unwrap();
150
+
inodes_lock.get_index_of(&did_entry)
151
+
};
152
+
153
+
let Some(did_inode) = did_inode else {
154
+
return Err(anyhow!("DID not found in inodes"));
155
+
};
156
+
157
+
for op in &commit.ops {
158
+
let Some((collection, rkey)) = op.path.split_once('/') else {
159
+
continue;
160
+
};
161
+
162
+
match op.action.as_str() {
163
+
"create" => {
164
+
// Fetch the record from PDS
165
+
let record_key = format!("{}/{}", collection, rkey);
166
+
let cache_key = format!("{}/{}", did, record_key);
167
+
168
+
// Fetch record content from PDS
169
+
match fetch_record(pds, did, collection, rkey).await {
170
+
Ok(content) => {
171
+
let content_len = content.len() as u64;
172
+
173
+
// Add the record to inodes
174
+
let (collection_inode, record_inode) = {
175
+
let mut inodes_lock = inodes.lock().unwrap();
176
+
177
+
// Ensure collection exists
178
+
let collection_entry = PdsFsEntry::Collection(PdsFsCollection {
179
+
parent: did_inode,
180
+
nsid: collection.to_string(),
181
+
});
182
+
let (collection_inode, _) = inodes_lock.insert_full(collection_entry);
183
+
184
+
// Add the record
185
+
let record_entry = PdsFsEntry::Record(PdsFsRecord {
186
+
parent: collection_inode,
187
+
rkey: rkey.to_string(),
188
+
});
189
+
let (record_inode, _) = inodes_lock.insert_full(record_entry);
190
+
(collection_inode, record_inode)
191
+
};
192
+
193
+
// Cache the content and size
194
+
content_cache.lock().unwrap().insert(cache_key, content);
195
+
sizes.lock().unwrap().insert(record_inode, content_len);
196
+
197
+
// Notify Finder about the new file (release lock first)
198
+
let filename = format!("{}.json", rkey);
199
+
if let Err(e) = notifier.inval_entry(collection_inode as u64, filename.as_ref()) {
200
+
eprintln!("Failed to invalidate entry for {}: {}", filename, e);
201
+
}
202
+
203
+
println!("Created: {}/{}", collection, rkey);
204
+
}
205
+
Err(e) => {
206
+
eprintln!("Failed to fetch record {}/{}: {}", collection, rkey, e);
207
+
}
208
+
}
209
+
}
210
+
"delete" => {
211
+
// Get inodes before removing
212
+
let (collection_inode_opt, child_inode_opt) = {
213
+
let mut inodes_lock = inodes.lock().unwrap();
214
+
215
+
// Find the collection
216
+
let collection_entry = PdsFsEntry::Collection(PdsFsCollection {
217
+
parent: did_inode,
218
+
nsid: collection.to_string(),
219
+
});
220
+
let collection_inode = inodes_lock.get_index_of(&collection_entry);
221
+
222
+
// Find and remove the record
223
+
let child_inode = if let Some(coll_ino) = collection_inode {
224
+
let record_entry = PdsFsEntry::Record(PdsFsRecord {
225
+
parent: coll_ino,
226
+
rkey: rkey.to_string(),
227
+
});
228
+
let child_ino = inodes_lock.get_index_of(&record_entry);
229
+
inodes_lock.shift_remove(&record_entry);
230
+
child_ino
231
+
} else {
232
+
None
233
+
};
234
+
235
+
(collection_inode, child_inode)
236
+
};
237
+
238
+
// Notify Finder about the deletion (release lock first)
239
+
if let (Some(coll_ino), Some(child_ino)) = (collection_inode_opt, child_inode_opt) {
240
+
// Remove from caches
241
+
sizes.lock().unwrap().shift_remove(&child_ino);
242
+
let cache_key = format!("{}/{}/{}", did, collection, rkey);
243
+
content_cache.lock().unwrap().shift_remove(&cache_key);
244
+
245
+
let filename = format!("{}.json", rkey);
246
+
if let Err(e) = notifier.delete(coll_ino as u64, child_ino as u64, filename.as_ref()) {
247
+
eprintln!("Failed to notify deletion for {}: {}", filename, e);
248
+
}
249
+
}
250
+
251
+
println!("Deleted: {}/{}", collection, rkey);
252
+
}
253
+
"update" => {
254
+
// For updates, invalidate the inode so content is re-fetched
255
+
let record_inode_opt = {
256
+
let inodes_lock = inodes.lock().unwrap();
257
+
let collection_entry = PdsFsEntry::Collection(PdsFsCollection {
258
+
parent: did_inode,
259
+
nsid: collection.to_string(),
260
+
});
261
+
262
+
if let Some(collection_inode) = inodes_lock.get_index_of(&collection_entry) {
263
+
let record_entry = PdsFsEntry::Record(PdsFsRecord {
264
+
parent: collection_inode,
265
+
rkey: rkey.to_string(),
266
+
});
267
+
inodes_lock.get_index_of(&record_entry)
268
+
} else {
269
+
None
270
+
}
271
+
};
272
+
273
+
// Notify Finder to invalidate the inode (release lock first)
274
+
if let Some(record_ino) = record_inode_opt {
275
+
// Clear caches so content is recalculated
276
+
sizes.lock().unwrap().shift_remove(&record_ino);
277
+
let cache_key = format!("{}/{}/{}", did, collection, rkey);
278
+
content_cache.lock().unwrap().shift_remove(&cache_key);
279
+
280
+
// Invalidate the entire inode (metadata and all data)
281
+
if let Err(e) = notifier.inval_inode(record_ino as u64, 0, 0) {
282
+
eprintln!("Failed to invalidate inode for {}/{}: {}", collection, rkey, e);
283
+
}
284
+
}
285
+
286
+
println!("Updated: {}/{}", collection, rkey);
287
+
}
288
+
_ => {}
289
+
}
290
+
}
291
+
292
+
Ok(())
293
+
}
294
+
295
+
/// Fetch a record from the PDS
296
+
async fn fetch_record(pds: &str, did: &str, collection: &str, rkey: &str) -> Result<String> {
297
+
let client = AtpServiceClient::new(IsahcClient::new(pds));
298
+
let did = types::string::Did::new(did.to_string()).map_err(|e| anyhow!(e))?;
299
+
let collection_nsid = types::string::Nsid::new(collection.to_string()).map_err(|e| anyhow!(e))?;
300
+
let record_key = types::string::RecordKey::new(rkey.to_string()).map_err(|e| anyhow!(e))?;
301
+
302
+
let response = client
303
+
.service
304
+
.com
305
+
.atproto
306
+
.repo
307
+
.get_record(com::atproto::repo::get_record::Parameters::from(
308
+
com::atproto::repo::get_record::ParametersData {
309
+
cid: None,
310
+
collection: collection_nsid,
311
+
repo: types::string::AtIdentifier::Did(did),
312
+
rkey: record_key,
313
+
}
314
+
))
315
+
.await?;
316
+
317
+
Ok(serde_json::to_string_pretty(&response.value)?)
318
+
}
+282
-93
src/fs.rs
+282
-93
src/fs.rs
···
1
-
use std::{collections::BTreeMap, time};
1
+
use std::sync::{Arc, Mutex};
2
+
use std::time::{self, SystemTime, UNIX_EPOCH, Duration};
2
3
3
4
use atrium_repo::{Repository, blockstore::AsyncBlockStoreRead};
4
5
use futures::StreamExt;
5
-
use indexmap::IndexSet;
6
+
use indexmap::{IndexMap, IndexSet};
7
+
8
+
type Inode = usize;
9
+
10
+
/// Decode a TID (timestamp identifier) to get the timestamp in microseconds since Unix epoch
11
+
fn tid_to_timestamp(tid: &str) -> Option<SystemTime> {
12
+
const S32_CHAR: &[u8] = b"234567abcdefghijklmnopqrstuvwxyz";
13
+
14
+
if tid.len() != 13 {
15
+
return None;
16
+
}
17
+
18
+
let mut value: u64 = 0;
19
+
for ch in tid.chars() {
20
+
let pos = S32_CHAR.iter().position(|&c| c as char == ch)?;
21
+
// Big-endian: first character is most significant
22
+
value = (value << 5) | (pos as u64);
23
+
}
24
+
25
+
// Extract timestamp from upper bits (shifted by 10)
26
+
let micros = value >> 10;
27
+
28
+
UNIX_EPOCH.checked_add(Duration::from_micros(micros))
29
+
}
30
+
6
31
pub struct PdsFs<R> {
7
-
repo: Repository<R>,
8
-
inodes: IndexSet<PdsFsEntry>,
32
+
repos: Arc<Mutex<IndexMap<String, Repository<R>>>>,
33
+
inodes: Arc<Mutex<IndexSet<PdsFsEntry>>>,
34
+
sizes: Arc<Mutex<IndexMap<Inode, u64>>>,
35
+
content_cache: Arc<Mutex<IndexMap<String, String>>>,
36
+
rt: tokio::runtime::Runtime,
9
37
}
10
38
11
39
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
12
40
pub enum PdsFsEntry {
13
41
Zero,
14
42
Root,
15
-
Collection(String),
43
+
Did(String),
44
+
Collection(PdsFsCollection),
16
45
Record(PdsFsRecord),
17
46
}
18
47
48
+
impl PdsFsEntry {
49
+
fn as_collection(&self) -> Option<&PdsFsCollection> {
50
+
match &self {
51
+
Self::Collection(c) => Some(c),
52
+
_ => None,
53
+
}
54
+
}
55
+
56
+
fn as_did(&self) -> Option<&String> {
57
+
match &self {
58
+
Self::Did(d) => Some(d),
59
+
_ => None,
60
+
}
61
+
}
62
+
63
+
fn unwrap_collection(&self) -> &PdsFsCollection {
64
+
self.as_collection().unwrap()
65
+
}
66
+
67
+
fn unwrap_did(&self) -> &String {
68
+
self.as_did().unwrap()
69
+
}
70
+
}
71
+
72
+
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
73
+
pub struct PdsFsCollection {
74
+
pub parent: Inode,
75
+
pub nsid: String,
76
+
}
77
+
19
78
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
20
79
pub struct PdsFsRecord {
21
-
collection: String,
22
-
rkey: String,
80
+
pub parent: Inode,
81
+
pub rkey: String,
23
82
}
24
83
25
-
impl PdsFsRecord {
26
-
fn key(&self) -> String {
27
-
format!("{}/{}", self.collection, self.rkey)
28
-
}
29
-
}
84
+
// impl PdsFsRecord {
85
+
// fn key(&self) -> String {
86
+
// format!("{}/{}", self.collection, self.rkey)
87
+
// }
88
+
// }
30
89
31
90
const TTL: time::Duration = time::Duration::from_secs(300);
32
91
const BLKSIZE: u32 = 512;
···
53
112
where
54
113
R: AsyncBlockStoreRead,
55
114
{
56
-
pub async fn new(mut repo: Repository<R>) -> Self {
115
+
pub fn new() -> Self {
116
+
PdsFs {
117
+
repos: Arc::new(Mutex::new(Default::default())),
118
+
inodes: Arc::new(Mutex::new(IndexSet::from([PdsFsEntry::Zero, PdsFsEntry::Root]))),
119
+
sizes: Arc::new(Mutex::new(Default::default())),
120
+
content_cache: Arc::new(Mutex::new(Default::default())),
121
+
rt: tokio::runtime::Runtime::new().unwrap(),
122
+
}
123
+
}
124
+
125
+
pub fn get_shared_state(&self) -> (Arc<Mutex<IndexMap<String, Repository<R>>>>, Arc<Mutex<IndexSet<PdsFsEntry>>>, Arc<Mutex<IndexMap<Inode, u64>>>, Arc<Mutex<IndexMap<String, String>>>) {
126
+
(Arc::clone(&self.repos), Arc::clone(&self.inodes), Arc::clone(&self.sizes), Arc::clone(&self.content_cache))
127
+
}
128
+
129
+
pub async fn add(&mut self, did: String, mut repo: Repository<R>) {
57
130
let mut mst = repo.tree();
58
131
59
-
// collect all keys and group by collection
60
-
let mut keys = Box::pin(mst.keys());
61
-
let mut collections: BTreeMap<String, Vec<PdsFsRecord>> = BTreeMap::new();
132
+
let did_inode = {
133
+
let mut inodes = self.inodes.lock().unwrap();
134
+
let (did_inode, _) = inodes.insert_full(PdsFsEntry::Did(did.clone()));
135
+
did_inode
136
+
};
62
137
138
+
let mut keys = Box::pin(mst.keys());
63
139
while let Some(Ok(key)) = keys.next().await {
64
-
if let Some((collection, rkey)) = key.split_once("/") {
65
-
let record = PdsFsRecord {
66
-
collection: collection.to_owned(),
140
+
if let Some((collection_name, rkey)) = key.split_once("/") {
141
+
let mut inodes = self.inodes.lock().unwrap();
142
+
let (collection_inode, _) = inodes.insert_full(PdsFsEntry::Collection(PdsFsCollection {
143
+
parent: did_inode,
144
+
nsid: collection_name.to_owned(),
145
+
}));
146
+
147
+
inodes.insert(PdsFsEntry::Record(PdsFsRecord {
148
+
parent: collection_inode,
67
149
rkey: rkey.to_owned(),
68
-
};
69
-
70
-
collections
71
-
.entry(collection.to_owned())
72
-
.or_insert_with(Vec::new)
73
-
.push(record);
150
+
}));
74
151
}
75
152
}
76
153
77
154
drop(keys);
78
155
drop(mst);
79
156
80
-
// build inode structure with proper ordering
81
-
let mut inodes = IndexSet::from([PdsFsEntry::Zero, PdsFsEntry::Root]);
82
-
83
-
// add collections first
84
-
for n in collections.keys().cloned() {
85
-
inodes.insert(PdsFsEntry::Collection(n));
86
-
}
87
-
88
-
// then add all records grouped by collection
89
-
for r in collections.values().flatten().cloned() {
90
-
inodes.insert(PdsFsEntry::Record(r));
91
-
}
92
-
93
-
println!("constructed {} inodes", inodes.len());
94
-
95
-
PdsFs { repo, inodes }
157
+
self.repos.lock().unwrap().insert(did, repo);
96
158
}
97
159
98
160
fn attr(&mut self, ino: u64) -> fuser::FileAttr {
99
-
match self.inodes.get_index(ino as usize) {
161
+
let inodes = self.inodes.lock().unwrap();
162
+
match inodes.get_index(ino as usize) {
100
163
Some(PdsFsEntry::Root) => ROOTDIR_ATTR,
101
164
Some(PdsFsEntry::Collection(_)) => fuser::FileAttr {
102
165
ino,
···
115
178
flags: 0,
116
179
blksize: BLKSIZE,
117
180
},
181
+
Some(PdsFsEntry::Did(_)) => fuser::FileAttr {
182
+
ino,
183
+
size: 0,
184
+
blocks: 0,
185
+
atime: time::UNIX_EPOCH,
186
+
mtime: time::UNIX_EPOCH,
187
+
ctime: time::UNIX_EPOCH,
188
+
crtime: time::UNIX_EPOCH,
189
+
kind: fuser::FileType::Directory,
190
+
perm: 0o755,
191
+
nlink: 2,
192
+
uid: 1000,
193
+
gid: 1000,
194
+
rdev: 0,
195
+
flags: 0,
196
+
blksize: BLKSIZE,
197
+
},
118
198
Some(PdsFsEntry::Record(r)) => {
119
-
let rt = tokio::runtime::Runtime::new().unwrap();
120
-
let size = rt
121
-
.block_on(self.repo.get_raw::<ipld_core::ipld::Ipld>(&r.key()))
122
-
.ok()
123
-
.flatten()
124
-
.map_or(0, |v| serde_json::to_string(&v).unwrap().len())
125
-
as u64;
199
+
let col = inodes[r.parent].unwrap_collection();
200
+
let did = inodes[col.parent].unwrap_did().clone();
201
+
let rkey = r.rkey.clone();
202
+
let collection_nsid = col.nsid.clone();
203
+
drop(inodes);
204
+
205
+
// Check cache first
206
+
let size = {
207
+
let sizes = self.sizes.lock().unwrap();
208
+
if let Some(&cached_size) = sizes.get(&(ino as usize)) {
209
+
cached_size
210
+
} else {
211
+
drop(sizes);
212
+
// Not in cache, try to fetch from repo
213
+
let mut repos = self.repos.lock().unwrap();
214
+
let repo = &mut repos[&did];
215
+
let key = format!("{}/{}", collection_nsid, rkey);
216
+
let size = self
217
+
.rt
218
+
.block_on(repo.get_raw::<ipld_core::ipld::Ipld>(&key))
219
+
.ok()
220
+
.flatten()
221
+
.map_or(500, |v| serde_json::to_string_pretty(&v).unwrap().len())
222
+
as u64;
223
+
// Cache it for next time
224
+
self.sizes.lock().unwrap().insert(ino as usize, size);
225
+
size
226
+
}
227
+
};
126
228
let blocks = ((size as u32 + BLKSIZE - 1) / BLKSIZE) as u64;
229
+
230
+
// Decode TID to get creation timestamp
231
+
let timestamp = tid_to_timestamp(&rkey).unwrap_or(time::UNIX_EPOCH);
232
+
127
233
fuser::FileAttr {
128
234
ino,
129
235
size,
130
236
blocks,
131
-
atime: time::UNIX_EPOCH,
132
-
mtime: time::UNIX_EPOCH,
133
-
ctime: time::UNIX_EPOCH,
134
-
crtime: time::UNIX_EPOCH,
237
+
atime: timestamp,
238
+
mtime: timestamp,
239
+
ctime: timestamp,
240
+
crtime: timestamp,
135
241
kind: fuser::FileType::RegularFile,
136
242
perm: 0o644,
137
243
nlink: 1,
···
139
245
gid: 20,
140
246
rdev: 0,
141
247
flags: 0,
142
-
blksize: 512,
248
+
blksize: BLKSIZE,
143
249
}
144
250
}
145
251
_ => panic!("zero"),
···
158
264
_fh: Option<u64>,
159
265
reply: fuser::ReplyAttr,
160
266
) {
161
-
if (ino as usize) < self.inodes.len() {
267
+
let len = self.inodes.lock().unwrap().len();
268
+
if (ino as usize) < len {
162
269
reply.attr(&TTL, &self.attr(ino as u64))
163
270
} else {
164
271
reply.error(libc::ENOENT)
···
173
280
offset: i64,
174
281
mut reply: fuser::ReplyDirectory,
175
282
) {
176
-
match self.inodes.get_index(ino as usize) {
283
+
let inodes = self.inodes.lock().unwrap();
284
+
match inodes.get_index(ino as usize) {
177
285
Some(PdsFsEntry::Root) => {
178
286
let entries: Vec<_> = vec![(ino, ".".to_string()), (ino, "..".to_string())]
179
287
.into_iter()
180
-
.chain(self.inodes.iter().enumerate().filter_map(|(i, e)| {
181
-
if let PdsFsEntry::Collection(name) = e {
182
-
Some((i as u64, name.clone()))
288
+
.chain(inodes.iter().enumerate().filter_map(|(i, e)| {
289
+
if let PdsFsEntry::Did(did) = e {
290
+
Some((i as u64, did.clone()))
183
291
} else {
184
292
None
185
293
}
186
294
}))
187
295
.collect();
296
+
drop(inodes);
188
297
189
298
for (index, (inode_num, name)) in
190
299
entries.into_iter().enumerate().skip(offset as usize)
···
200
309
}
201
310
reply.ok()
202
311
}
203
-
Some(PdsFsEntry::Collection(collection_name)) => {
204
-
let entries: Vec<_> = vec![
205
-
(ino, ".".to_string()),
206
-
(1, "..".to_string()), // Parent is root (inode 1)
207
-
]
208
-
.into_iter()
209
-
.chain(self.inodes.iter().enumerate().filter_map(|(i, e)| {
210
-
if let PdsFsEntry::Record(record) = e {
211
-
if record.collection == *collection_name {
212
-
Some((i as u64, record.rkey.clone()))
312
+
Some(PdsFsEntry::Did(_)) => {
313
+
let entries: Vec<_> = vec![(ino, ".".to_string()), (1, "..".to_string())]
314
+
.into_iter()
315
+
.chain(inodes.iter().enumerate().filter_map(|(i, e)| {
316
+
if let PdsFsEntry::Collection(col) = e {
317
+
if col.parent == ino as usize {
318
+
Some((i as u64, col.nsid.clone()))
319
+
} else {
320
+
None
321
+
}
213
322
} else {
214
323
None
215
324
}
216
-
} else {
217
-
None
325
+
}))
326
+
.collect();
327
+
drop(inodes);
328
+
329
+
for (index, (inode_num, name)) in entries.into_iter().enumerate().skip(offset as usize) {
330
+
let full = reply.add(
331
+
inode_num,
332
+
(index + 1) as i64,
333
+
if name.starts_with('.') {
334
+
fuser::FileType::Directory
335
+
} else {
336
+
fuser::FileType::RegularFile
337
+
},
338
+
name,
339
+
);
340
+
if full {
341
+
break;
218
342
}
219
-
}))
220
-
.collect();
343
+
}
344
+
345
+
reply.ok();
346
+
}
347
+
Some(PdsFsEntry::Collection(c)) => {
348
+
let parent_ino = c.parent;
349
+
let entries: Vec<_> = [(ino, ".".to_string()), (parent_ino as u64, "..".to_string())]
350
+
.into_iter()
351
+
.chain(inodes.iter().enumerate().filter_map(|(i, e)| {
352
+
if let PdsFsEntry::Record(record) = e {
353
+
if record.parent == ino as usize {
354
+
Some((i as u64, format!("{}.json", record.rkey)))
355
+
} else {
356
+
None
357
+
}
358
+
} else {
359
+
None
360
+
}
361
+
}))
362
+
.collect();
363
+
drop(inodes);
221
364
222
-
for (index, (inode_num, name)) in
223
-
entries.into_iter().enumerate().skip(offset as usize)
224
-
{
225
-
if reply.add(
365
+
for (index, (inode_num, name)) in entries.into_iter().enumerate().skip(offset as usize) {
366
+
let full = reply.add(
226
367
inode_num,
227
368
(index + 1) as i64,
228
369
if name.starts_with('.') {
···
231
372
fuser::FileType::RegularFile
232
373
},
233
374
name,
234
-
) {
375
+
);
376
+
if full {
235
377
break;
236
378
}
237
379
}
380
+
238
381
reply.ok()
239
382
}
240
-
_ => reply.error(libc::ENOENT),
383
+
_ => {
384
+
drop(inodes);
385
+
reply.error(libc::ENOENT)
386
+
}
241
387
}
242
388
}
243
389
···
248
394
name: &std::ffi::OsStr,
249
395
reply: fuser::ReplyEntry,
250
396
) {
251
-
match self.inodes.get_index(parent as usize) {
397
+
let inodes = self.inodes.lock().unwrap();
398
+
match inodes.get_index(parent as usize) {
252
399
Some(PdsFsEntry::Root) => {
253
-
let collection = PdsFsEntry::Collection(name.to_string_lossy().to_string());
254
-
if let Some(ino) = self.inodes.get_index_of(&collection) {
400
+
let did = PdsFsEntry::Did(name.to_string_lossy().to_string());
401
+
if let Some(ino) = inodes.get_index_of(&did) {
402
+
drop(inodes);
403
+
reply.entry(&TTL, &self.attr(ino as u64), 0);
404
+
} else {
405
+
drop(inodes);
406
+
reply.error(libc::ENOENT)
407
+
}
408
+
}
409
+
Some(PdsFsEntry::Did(_)) => {
410
+
let col = PdsFsEntry::Collection(PdsFsCollection {
411
+
parent: parent as usize,
412
+
nsid: name.to_string_lossy().to_string(),
413
+
});
414
+
if let Some(ino) = inodes.get_index_of(&col) {
415
+
drop(inodes);
255
416
reply.entry(&TTL, &self.attr(ino as u64), 0);
256
417
} else {
418
+
drop(inodes);
257
419
reply.error(libc::ENOENT)
258
420
}
259
421
}
260
-
Some(PdsFsEntry::Collection(c)) => {
422
+
Some(PdsFsEntry::Collection(_)) => {
423
+
let name_str = name.to_string_lossy();
424
+
let rkey = name_str.strip_suffix(".json").unwrap_or(&name_str).to_string();
261
425
let record = PdsFsEntry::Record(PdsFsRecord {
262
-
collection: c.to_owned(),
263
-
rkey: name.to_string_lossy().to_string(),
426
+
parent: parent as usize,
427
+
rkey,
264
428
});
265
-
if let Some(ino) = self.inodes.get_index_of(&record) {
429
+
if let Some(ino) = inodes.get_index_of(&record) {
430
+
drop(inodes);
266
431
reply.entry(&TTL, &self.attr(ino as u64), 0);
267
432
} else {
433
+
drop(inodes);
268
434
reply.error(libc::ENOENT)
269
435
}
270
436
}
271
-
_ => reply.error(libc::ENOENT),
437
+
_ => {
438
+
drop(inodes);
439
+
reply.error(libc::ENOENT)
440
+
}
272
441
}
273
442
}
274
443
···
283
452
_lock: Option<u64>,
284
453
reply: fuser::ReplyData,
285
454
) {
286
-
let rt = tokio::runtime::Runtime::new().unwrap();
287
-
if let Some(PdsFsEntry::Record(r)) = self.inodes.get_index(ino as usize)
288
-
&& let Ok(Some(val)) = rt.block_on(self.repo.get_raw::<ipld_core::ipld::Ipld>(&r.key()))
289
-
{
290
-
reply.data(&serde_json::to_string(&val).unwrap().as_bytes()[offset as usize..]);
455
+
let inodes = self.inodes.lock().unwrap();
456
+
if let Some(PdsFsEntry::Record(r)) = inodes.get_index(ino as usize) {
457
+
let col = inodes[r.parent].unwrap_collection();
458
+
let did = inodes[col.parent].unwrap_did().clone();
459
+
let key = format!("{}/{}", col.nsid, r.rkey);
460
+
let cache_key = format!("{}/{}", did, key);
461
+
drop(inodes);
462
+
463
+
// Check content cache first (for new records from firehose)
464
+
{
465
+
let cache = self.content_cache.lock().unwrap();
466
+
if let Some(content) = cache.get(&cache_key) {
467
+
reply.data(&content.as_bytes()[offset as usize..]);
468
+
return;
469
+
}
470
+
}
471
+
472
+
// Fall back to repo
473
+
let mut repos = self.repos.lock().unwrap();
474
+
let repo = &mut repos[&did];
475
+
if let Ok(Some(val)) = self.rt.block_on(repo.get_raw::<ipld_core::ipld::Ipld>(&key)) {
476
+
reply.data(&serde_json::to_string(&val).unwrap().as_bytes()[offset as usize..]);
477
+
return;
478
+
}
291
479
} else {
292
-
reply.error(libc::ENOENT);
480
+
drop(inodes);
293
481
}
482
+
reply.error(libc::ENOENT);
294
483
}
295
484
}
+113
-50
src/main.rs
+113
-50
src/main.rs
···
1
-
#![feature(let_chains)]
2
1
mod client;
3
2
mod error;
3
+
mod firehose;
4
4
mod fs;
5
5
mod resolver;
6
6
···
10
10
use atrium_repo::{Repository, blockstore::CarStore};
11
11
use atrium_xrpc_client::isahc::IsahcClient;
12
12
use fuser::MountOption;
13
-
use indicatif::{ProgressBar, ProgressStyle};
13
+
use futures::{StreamExt, stream};
14
+
use indicatif::{MultiProgress, ProgressBar, ProgressStyle};
14
15
use std::{
15
16
io::{Cursor, Write},
16
17
path::PathBuf,
18
+
sync::Arc,
17
19
};
18
-
use xdg::BaseDirectories;
19
20
20
21
fn main() {
21
22
let rt = tokio::runtime::Runtime::new().unwrap();
22
23
let matches = clap::command!()
23
-
.arg(clap::Arg::new("handle").index(1))
24
+
.arg(
25
+
clap::Arg::new("handles")
26
+
.index(1)
27
+
.required(true)
28
+
.num_args(1..)
29
+
.help("One or more handles to download and mount"),
30
+
)
24
31
.arg(
25
32
clap::Arg::new("mountpoint")
26
33
.short('m')
···
28
35
.value_parser(clap::value_parser!(PathBuf)),
29
36
)
30
37
.get_matches();
31
-
let handle = matches.get_one::<String>("handle").unwrap();
38
+
let handles = matches
39
+
.get_many::<String>("handles")
40
+
.unwrap()
41
+
.cloned()
42
+
.collect::<Vec<_>>();
32
43
let mountpoint = matches
33
44
.get_one::<PathBuf>("mountpoint")
34
45
.map(ToOwned::to_owned)
35
-
.unwrap_or(PathBuf::from(&handle));
46
+
.unwrap_or(PathBuf::from("mnt"));
36
47
let _ = std::fs::create_dir_all(&mountpoint);
37
-
let resolver = resolver::id_resolver();
38
-
let id = rt.block_on(async {
39
-
resolver
40
-
.resolve(&atrium_api::types::string::Handle::new(handle.into()).unwrap())
41
-
.await
42
-
.unwrap()
43
-
});
44
48
45
-
let pb = ProgressBar::new_spinner();
46
-
pb.set_style(
47
-
ProgressStyle::default_spinner()
48
-
.template("{spinner:.green} [{elapsed_precise}] {msg}")
49
-
.unwrap()
50
-
.tick_strings(&["โ ", "โ ", "โ น", "โ ธ", "โ ผ", "โ ด", "โ ฆ", "โ ง", "โ ", "โ "]),
49
+
let resolver = Arc::new(resolver::id_resolver());
50
+
let bars = Arc::new(MultiProgress::new());
51
+
let repos = rt.block_on(
52
+
stream::iter(handles)
53
+
.then(|handle| {
54
+
let h = handle.clone();
55
+
let r = Arc::clone(&resolver);
56
+
let b = Arc::clone(&bars);
57
+
async move {
58
+
let id = r.resolve(&h).await?;
59
+
let bytes = cached_download(&id, &b).await?;
60
+
let repo = build_repo(bytes).await?;
61
+
Ok::<_, error::Error>((id.did.clone(), id.pds.clone(), repo))
62
+
}
63
+
})
64
+
.collect::<Vec<_>>(),
51
65
);
66
+
let (success, errors): (Vec<_>, Vec<_>) = repos.into_iter().partition(|r| r.is_ok());
67
+
for e in errors {
68
+
eprintln!("{:?}", e.as_ref().unwrap_err());
69
+
}
70
+
let repos_with_pds: Vec<_> = success
71
+
.into_iter()
72
+
.map(|s| s.unwrap())
73
+
.collect();
52
74
53
-
let bytes = rt.block_on(cached_download(&id, &pb)).unwrap();
54
-
let store = rt.block_on(async { CarStore::open(Cursor::new(bytes)).await.unwrap() });
55
-
let root = store.roots().next().unwrap();
56
-
let repo = rt.block_on(async { Repository::open(store, root).await.unwrap() });
75
+
// construct the fs
76
+
let mut fs = fs::PdsFs::new();
57
77
58
-
// construct the fs
59
-
let fs = rt.block_on(fs::PdsFs::new(repo));
78
+
// Extract (did, pds) pairs for WebSocket tasks before consuming repos
79
+
let did_pds_pairs: Vec<_> = repos_with_pds.iter()
80
+
.map(|(did, pds, _)| (did.clone(), pds.clone()))
81
+
.collect();
82
+
83
+
// Consume repos_with_pds to add repos to filesystem
84
+
for (did, _, repo) in repos_with_pds {
85
+
rt.block_on(fs.add(did, repo))
86
+
}
87
+
88
+
// get shared state for WebSocket tasks
89
+
let (_repos_arc, inodes_arc, sizes_arc, content_cache_arc) = fs.get_shared_state();
60
90
61
91
// mount
62
-
let options = vec![MountOption::RO, MountOption::FSName("pdsfs".to_string())];
63
-
let join_handle = fuser::spawn_mount2(fs, &mountpoint, &options).unwrap();
92
+
let options = vec![
93
+
MountOption::RO,
94
+
MountOption::FSName("pdsfs".to_string()),
95
+
MountOption::AllowOther,
96
+
MountOption::CUSTOM("local".to_string()),
97
+
MountOption::CUSTOM("volname=pdsfs".to_string()),
98
+
];
99
+
100
+
// Create session and get notifier for Finder refresh
101
+
let session = fuser::Session::new(fs, &mountpoint, &options).unwrap();
102
+
let notifier = session.notifier();
103
+
let _bg = session.spawn().unwrap();
104
+
105
+
// spawn WebSocket subscription tasks for each DID using the runtime handle
106
+
let rt_handle = rt.handle().clone();
107
+
for (did, pds) in did_pds_pairs {
108
+
let inodes_clone = Arc::clone(&inodes_arc);
109
+
let sizes_clone = Arc::clone(&sizes_arc);
110
+
let content_cache_clone = Arc::clone(&content_cache_arc);
111
+
let notifier_clone = notifier.clone();
112
+
113
+
rt_handle.spawn(async move {
114
+
if let Err(e) = firehose::subscribe_to_repo::<atrium_repo::blockstore::CarStore<std::io::Cursor<Vec<u8>>>>(
115
+
did,
116
+
pds,
117
+
inodes_clone,
118
+
sizes_clone,
119
+
content_cache_clone,
120
+
notifier_clone,
121
+
).await {
122
+
eprintln!("WebSocket error: {:?}", e);
123
+
}
124
+
});
125
+
}
64
126
65
127
println!("mounted at {mountpoint:?}");
66
128
print!("hit enter to unmount and exit...");
···
70
132
let mut input = String::new();
71
133
std::io::stdin().read_line(&mut input).unwrap();
72
134
73
-
join_handle.join();
74
-
std::fs::remove_dir(&mountpoint).unwrap();
75
-
76
135
println!("unmounted {mountpoint:?}");
77
136
}
78
137
79
-
async fn cached_download(id: &ResolvedIdentity, pb: &ProgressBar) -> Result<Vec<u8>, error::Error> {
80
-
let dirs = BaseDirectories::new();
138
+
async fn cached_download(
139
+
id: &ResolvedIdentity,
140
+
m: &MultiProgress,
141
+
) -> Result<Vec<u8>, error::Error> {
142
+
let mut pb = ProgressBar::new_spinner();
143
+
pb.set_style(
144
+
ProgressStyle::default_spinner()
145
+
.template("{spinner:.green} [{elapsed_precise}] {msg}")
146
+
.unwrap()
147
+
.tick_strings(&["โ ", "โ ", "โ น", "โ ธ", "โ ผ", "โ ด", "โ ฆ", "โ ง", "โ ", "โ "]),
148
+
);
149
+
pb.enable_steady_tick(std::time::Duration::from_millis(100));
150
+
pb = m.add(pb);
81
151
82
-
let dir = dirs.get_data_home().expect("$HOME is absent").join("pdsfs");
83
-
tokio::fs::create_dir_all(&dir).await?;
152
+
// Always download fresh - no caching for now to ensure up-to-date data
153
+
pb.set_message(format!("downloading CAR file for...{}", id.did));
154
+
let bytes = download_car_file(id, &pb).await?;
84
155
85
-
let file = dir.join(&id.did);
86
-
let exists = std::fs::exists(&file)?;
87
-
88
-
let bytes = if !exists {
89
-
pb.set_message(format!("downloading CAR file for\t...\t{}", id.did));
90
-
download_car_file(id, pb).await?
91
-
} else {
92
-
pb.set_message(format!("using cached CAR file for\t...\t{}", id.did));
93
-
tokio::fs::read(file).await?
94
-
};
95
-
96
-
pb.set_message(format!(
97
-
"received {} bytes for \t...\t{}",
98
-
bytes.len(),
99
-
id.did
100
-
));
156
+
pb.finish();
101
157
Ok(bytes)
102
158
}
103
159
···
123
179
124
180
Ok(bytes)
125
181
}
182
+
183
+
async fn build_repo(bytes: Vec<u8>) -> Result<Repository<CarStore<Cursor<Vec<u8>>>>, error::Error> {
184
+
let store = CarStore::open(Cursor::new(bytes)).await?;
185
+
let root = store.roots().next().unwrap();
186
+
let repo = Repository::open(store, root).await?;
187
+
Ok(repo)
188
+
}