+101
-7
Cargo.lock
+101
-7
Cargo.lock
···
98
98
]
99
99
100
100
[[package]]
101
+
name = "anyhow"
102
+
version = "1.0.100"
103
+
source = "registry+https://github.com/rust-lang/crates.io-index"
104
+
checksum = "a23eb6b1614318a8071c9b2521f36b424b2c83db5eb3a0fead4a6c0809af6e61"
105
+
106
+
[[package]]
101
107
name = "async-channel"
102
108
version = "1.9.0"
103
109
source = "registry+https://github.com/rust-lang/crates.io-index"
···
327
333
version = "3.19.0"
328
334
source = "registry+https://github.com/rust-lang/crates.io-index"
329
335
checksum = "46c5e41b57b8bba42a04676d81cb89e9ee8e859a1a66f80a5a72e1cb76b34d43"
336
+
337
+
[[package]]
338
+
name = "byteorder"
339
+
version = "1.5.0"
340
+
source = "registry+https://github.com/rust-lang/crates.io-index"
341
+
checksum = "1fd0f2584146f6f2ef48085050886acf353beff7305ebd1ae69500e27c67f64b"
330
342
331
343
[[package]]
332
344
name = "bytes"
···
669
681
checksum = "778e2ac28f6c47af28e4907f13ffd1e1ddbd400980a9abd7c8df189bf578a5ad"
670
682
dependencies = [
671
683
"libc",
672
-
"windows-sys 0.59.0",
684
+
"windows-sys 0.60.2",
673
685
]
674
686
675
687
[[package]]
···
991
1003
"idna",
992
1004
"ipnet",
993
1005
"once_cell",
994
-
"rand",
1006
+
"rand 0.9.2",
995
1007
"ring",
996
1008
"thiserror 2.0.12",
997
1009
"tinyvec",
···
1013
1025
"moka",
1014
1026
"once_cell",
1015
1027
"parking_lot",
1016
-
"rand",
1028
+
"rand 0.9.2",
1017
1029
"resolv-conf",
1018
1030
"smallvec",
1019
1031
"thiserror 2.0.12",
···
1757
1769
name = "pdsfs"
1758
1770
version = "0.1.0"
1759
1771
dependencies = [
1772
+
"anyhow",
1760
1773
"atrium-api",
1761
1774
"atrium-common",
1762
1775
"atrium-identity",
···
1776
1789
"serde_json",
1777
1790
"thiserror 2.0.12",
1778
1791
"tokio",
1792
+
"tokio-tungstenite",
1779
1793
"xdg",
1780
1794
]
1781
1795
···
1889
1903
1890
1904
[[package]]
1891
1905
name = "rand"
1906
+
version = "0.8.5"
1907
+
source = "registry+https://github.com/rust-lang/crates.io-index"
1908
+
checksum = "34af8d1a0e25924bc5b7c43c079c942339d8f0a8b57c39049bef581b46327404"
1909
+
dependencies = [
1910
+
"libc",
1911
+
"rand_chacha 0.3.1",
1912
+
"rand_core 0.6.4",
1913
+
]
1914
+
1915
+
[[package]]
1916
+
name = "rand"
1892
1917
version = "0.9.2"
1893
1918
source = "registry+https://github.com/rust-lang/crates.io-index"
1894
1919
checksum = "6db2770f06117d490610c7488547d543617b21bfa07796d7a12f6f1bd53850d1"
1895
1920
dependencies = [
1896
-
"rand_chacha",
1897
-
"rand_core",
1921
+
"rand_chacha 0.9.0",
1922
+
"rand_core 0.9.3",
1923
+
]
1924
+
1925
+
[[package]]
1926
+
name = "rand_chacha"
1927
+
version = "0.3.1"
1928
+
source = "registry+https://github.com/rust-lang/crates.io-index"
1929
+
checksum = "e6c10a63a0fa32252be49d21e7709d4d4baf8d231c2dbce1eaa8141b9b127d88"
1930
+
dependencies = [
1931
+
"ppv-lite86",
1932
+
"rand_core 0.6.4",
1898
1933
]
1899
1934
1900
1935
[[package]]
···
1904
1939
checksum = "d3022b5f1df60f26e1ffddd6c66e8aa15de382ae63b3a0c1bfc0e4d3e3f325cb"
1905
1940
dependencies = [
1906
1941
"ppv-lite86",
1907
-
"rand_core",
1942
+
"rand_core 0.9.3",
1943
+
]
1944
+
1945
+
[[package]]
1946
+
name = "rand_core"
1947
+
version = "0.6.4"
1948
+
source = "registry+https://github.com/rust-lang/crates.io-index"
1949
+
checksum = "ec0be4795e2f6a28069bec0b5ff3e2ac9bafc99e6a9a7dc3547996c5c816922c"
1950
+
dependencies = [
1951
+
"getrandom 0.2.16",
1908
1952
]
1909
1953
1910
1954
[[package]]
···
2057
2101
"errno",
2058
2102
"libc",
2059
2103
"linux-raw-sys",
2060
-
"windows-sys 0.59.0",
2104
+
"windows-sys 0.60.2",
2061
2105
]
2062
2106
2063
2107
[[package]]
···
2234
2278
]
2235
2279
2236
2280
[[package]]
2281
+
name = "sha1"
2282
+
version = "0.10.6"
2283
+
source = "registry+https://github.com/rust-lang/crates.io-index"
2284
+
checksum = "e3bf829a2d51ab4a5ddf1352d8470c140cadc8301b2ae1789db023f01cedd6ba"
2285
+
dependencies = [
2286
+
"cfg-if",
2287
+
"cpufeatures",
2288
+
"digest",
2289
+
]
2290
+
2291
+
[[package]]
2237
2292
name = "sha2"
2238
2293
version = "0.10.9"
2239
2294
source = "registry+https://github.com/rust-lang/crates.io-index"
···
2515
2570
]
2516
2571
2517
2572
[[package]]
2573
+
name = "tokio-tungstenite"
2574
+
version = "0.24.0"
2575
+
source = "registry+https://github.com/rust-lang/crates.io-index"
2576
+
checksum = "edc5f74e248dc973e0dbb7b74c7e0d6fcc301c694ff50049504004ef4d0cdcd9"
2577
+
dependencies = [
2578
+
"futures-util",
2579
+
"log",
2580
+
"native-tls",
2581
+
"tokio",
2582
+
"tokio-native-tls",
2583
+
"tungstenite",
2584
+
]
2585
+
2586
+
[[package]]
2518
2587
name = "tokio-util"
2519
2588
version = "0.7.15"
2520
2589
source = "registry+https://github.com/rust-lang/crates.io-index"
···
2663
2732
checksum = "e421abadd41a4225275504ea4d6566923418b7f05506fbc9c0fe86ba7396114b"
2664
2733
2665
2734
[[package]]
2735
+
name = "tungstenite"
2736
+
version = "0.24.0"
2737
+
source = "registry+https://github.com/rust-lang/crates.io-index"
2738
+
checksum = "18e5b8366ee7a95b16d32197d0b2604b43a0be89dc5fac9f8e96ccafbaedda8a"
2739
+
dependencies = [
2740
+
"byteorder",
2741
+
"bytes",
2742
+
"data-encoding",
2743
+
"http 1.3.1",
2744
+
"httparse",
2745
+
"log",
2746
+
"native-tls",
2747
+
"rand 0.8.5",
2748
+
"sha1",
2749
+
"thiserror 1.0.69",
2750
+
"utf-8",
2751
+
]
2752
+
2753
+
[[package]]
2666
2754
name = "typenum"
2667
2755
version = "1.18.0"
2668
2756
source = "registry+https://github.com/rust-lang/crates.io-index"
···
2712
2800
"idna",
2713
2801
"percent-encoding",
2714
2802
]
2803
+
2804
+
[[package]]
2805
+
name = "utf-8"
2806
+
version = "0.7.6"
2807
+
source = "registry+https://github.com/rust-lang/crates.io-index"
2808
+
checksum = "09cc8ee72d2a9becf2f2febe0205bbed8fc6615b7cb429ad062dc7b7ddd036a9"
2715
2809
2716
2810
[[package]]
2717
2811
name = "utf8_iter"
+4
-2
Cargo.toml
+4
-2
Cargo.toml
···
4
4
edition = "2024"
5
5
6
6
[dependencies]
7
+
anyhow = "1.0"
7
8
atrium-api = "0.25.4"
8
9
atrium-common = "0.1.2"
9
10
atrium-identity = "0.1.5"
···
11
12
atrium-xrpc = "0.12.3"
12
13
atrium-xrpc-client = { version = "0.5.14", features=["isahc"] }
13
14
clap = { version = "4.5.41", features = ["cargo"] }
14
-
fuser = "0.15.1"
15
+
fuser = { version = "0.15.1", features = ["abi-7-18"] }
15
16
futures = "0.3.31"
16
17
hickory-resolver = "0.25.2"
17
18
indexmap = "2.10.0"
···
22
23
serde_ipld_dagcbor = "0.6.3"
23
24
serde_json = "1.0.141"
24
25
thiserror = "2.0.12"
25
-
tokio = { version = "1.46.1", features = ["fs"] }
26
+
tokio = { version = "1.46.1", features = ["fs", "sync", "rt-multi-thread"] }
27
+
tokio-tungstenite = { version = "0.24", features = ["native-tls"] }
26
28
xdg = "3.0.0"
+96
flake.lock
+96
flake.lock
···
1
+
{
2
+
"nodes": {
3
+
"flake-utils": {
4
+
"inputs": {
5
+
"systems": "systems"
6
+
},
7
+
"locked": {
8
+
"lastModified": 1731533236,
9
+
"narHash": "sha256-l0KFg5HjrsfsO/JpG+r7fRrqm12kzFHyUHqHCVpMMbI=",
10
+
"owner": "numtide",
11
+
"repo": "flake-utils",
12
+
"rev": "11707dc2f618dd54ca8739b309ec4fc024de578b",
13
+
"type": "github"
14
+
},
15
+
"original": {
16
+
"owner": "numtide",
17
+
"repo": "flake-utils",
18
+
"type": "github"
19
+
}
20
+
},
21
+
"nixpkgs": {
22
+
"locked": {
23
+
"lastModified": 1757487488,
24
+
"narHash": "sha256-zwE/e7CuPJUWKdvvTCB7iunV4E/+G0lKfv4kk/5Izdg=",
25
+
"owner": "NixOS",
26
+
"repo": "nixpkgs",
27
+
"rev": "ab0f3607a6c7486ea22229b92ed2d355f1482ee0",
28
+
"type": "github"
29
+
},
30
+
"original": {
31
+
"owner": "NixOS",
32
+
"ref": "nixos-unstable",
33
+
"repo": "nixpkgs",
34
+
"type": "github"
35
+
}
36
+
},
37
+
"nixpkgs_2": {
38
+
"locked": {
39
+
"lastModified": 1744536153,
40
+
"narHash": "sha256-awS2zRgF4uTwrOKwwiJcByDzDOdo3Q1rPZbiHQg/N38=",
41
+
"owner": "NixOS",
42
+
"repo": "nixpkgs",
43
+
"rev": "18dd725c29603f582cf1900e0d25f9f1063dbf11",
44
+
"type": "github"
45
+
},
46
+
"original": {
47
+
"owner": "NixOS",
48
+
"ref": "nixpkgs-unstable",
49
+
"repo": "nixpkgs",
50
+
"type": "github"
51
+
}
52
+
},
53
+
"root": {
54
+
"inputs": {
55
+
"flake-utils": "flake-utils",
56
+
"nixpkgs": "nixpkgs",
57
+
"rust-overlay": "rust-overlay"
58
+
}
59
+
},
60
+
"rust-overlay": {
61
+
"inputs": {
62
+
"nixpkgs": "nixpkgs_2"
63
+
},
64
+
"locked": {
65
+
"lastModified": 1757644300,
66
+
"narHash": "sha256-bVIDYz31bCdZId441sqgkImnA7aYr2UQzQlyl+O2DWc=",
67
+
"owner": "oxalica",
68
+
"repo": "rust-overlay",
69
+
"rev": "591c5ae84f066bdfc9797b217df392d58eafd088",
70
+
"type": "github"
71
+
},
72
+
"original": {
73
+
"owner": "oxalica",
74
+
"repo": "rust-overlay",
75
+
"type": "github"
76
+
}
77
+
},
78
+
"systems": {
79
+
"locked": {
80
+
"lastModified": 1681028828,
81
+
"narHash": "sha256-Vy1rq5AaRuLzOxct8nz4T6wlgyUR7zLU309k9mBC768=",
82
+
"owner": "nix-systems",
83
+
"repo": "default",
84
+
"rev": "da67096a3b9bf56a91d16901293e51ba5b49a27e",
85
+
"type": "github"
86
+
},
87
+
"original": {
88
+
"owner": "nix-systems",
89
+
"repo": "default",
90
+
"type": "github"
91
+
}
92
+
}
93
+
},
94
+
"root": "root",
95
+
"version": 7
96
+
}
+92
flake.nix
+92
flake.nix
···
1
+
{
2
+
description = "pdsfs - mount an atproto PDS repository as a FUSE filesystem";
3
+
4
+
inputs = {
5
+
nixpkgs.url = "github:NixOS/nixpkgs/nixos-unstable";
6
+
rust-overlay.url = "github:oxalica/rust-overlay";
7
+
flake-utils.url = "github:numtide/flake-utils";
8
+
};
9
+
10
+
outputs = { self, nixpkgs, rust-overlay, flake-utils }:
11
+
flake-utils.lib.eachDefaultSystem (system:
12
+
let
13
+
overlays = [ (import rust-overlay) ];
14
+
pkgs = import nixpkgs {
15
+
inherit system overlays;
16
+
};
17
+
18
+
rustToolchain =
19
+
if builtins.pathExists ./rust-toolchain.toml
20
+
then pkgs.rust-bin.fromRustupToolchainFile ./rust-toolchain.toml
21
+
else pkgs.rust-bin.stable.latest.default.override {
22
+
extensions = [ "rust-src" "rust-analyzer" ];
23
+
};
24
+
25
+
nativeBuildInputs = with pkgs; [
26
+
rustToolchain
27
+
pkg-config
28
+
];
29
+
30
+
buildInputs = with pkgs; [
31
+
fuse3
32
+
openssl
33
+
] ++ lib.optionals stdenv.isDarwin [
34
+
darwin.apple_sdk.frameworks.Security
35
+
darwin.apple_sdk.frameworks.CoreServices
36
+
];
37
+
38
+
pdsfs = pkgs.rustPlatform.buildRustPackage {
39
+
pname = "pdsfs";
40
+
version = "0.1.0";
41
+
42
+
src = ./.;
43
+
44
+
cargoLock = {
45
+
lockFile = ./Cargo.lock;
46
+
};
47
+
48
+
inherit nativeBuildInputs buildInputs;
49
+
50
+
# Skip tests that require network access or FUSE capabilities
51
+
doCheck = false;
52
+
53
+
meta = with pkgs.lib; {
54
+
description = "Mount an atproto PDS repository as a FUSE filesystem";
55
+
homepage = "https://github.com/tangled/pdsfs"; # Update with actual repository URL
56
+
license = licenses.mit; # Update with actual license
57
+
maintainers = [ ]; # Add maintainers if desired
58
+
platforms = platforms.linux ++ platforms.darwin;
59
+
};
60
+
};
61
+
in
62
+
{
63
+
packages = {
64
+
default = pdsfs;
65
+
pdsfs = pdsfs;
66
+
};
67
+
68
+
devShells.default = pkgs.mkShell {
69
+
inherit buildInputs nativeBuildInputs;
70
+
71
+
shellHook = ''
72
+
echo "pdsfs development environment"
73
+
echo "Run 'cargo build' to build the project"
74
+
echo "Run 'cargo run -- <handle>' to mount a PDS repository"
75
+
echo ""
76
+
echo "FUSE development notes:"
77
+
echo "- Ensure you have FUSE permissions (add user to 'fuse' group if needed)"
78
+
echo "- Use 'fusermount -u <mountpoint>' to unmount if needed"
79
+
'';
80
+
};
81
+
82
+
# Allow building with different Rust versions
83
+
devShells.rust-nightly = pkgs.mkShell {
84
+
buildInputs = buildInputs ++ [
85
+
(pkgs.rust-bin.nightly.latest.default.override {
86
+
extensions = [ "rust-src" "rust-analyzer" ];
87
+
})
88
+
pkgs.pkg-config
89
+
];
90
+
};
91
+
});
92
+
}
+318
src/firehose.rs
+318
src/firehose.rs
···
1
+
use anyhow::{anyhow, Result};
2
+
use atrium_api::com::atproto::sync::subscribe_repos::{Commit, NSID};
3
+
use atrium_api::client::AtpServiceClient;
4
+
use atrium_api::com;
5
+
use atrium_api::types;
6
+
use atrium_xrpc_client::isahc::IsahcClient;
7
+
use futures::StreamExt;
8
+
use ipld_core::ipld::Ipld;
9
+
use std::io::Cursor;
10
+
use std::sync::{Arc, Mutex};
11
+
use tokio_tungstenite::connect_async;
12
+
use tokio_tungstenite::tungstenite::Message;
13
+
14
+
use crate::fs::{PdsFsCollection, PdsFsEntry, PdsFsRecord};
15
+
use indexmap::{IndexMap, IndexSet};
16
+
17
+
/// Frame header types for WebSocket messages
18
+
#[derive(Debug, Clone, PartialEq, Eq)]
19
+
enum FrameHeader {
20
+
Message(Option<String>),
21
+
Error,
22
+
}
23
+
24
+
impl TryFrom<Ipld> for FrameHeader {
25
+
type Error = anyhow::Error;
26
+
27
+
fn try_from(value: Ipld) -> Result<Self> {
28
+
if let Ipld::Map(map) = value {
29
+
if let Some(Ipld::Integer(i)) = map.get("op") {
30
+
match i {
31
+
1 => {
32
+
let t = if let Some(Ipld::String(s)) = map.get("t") {
33
+
Some(s.clone())
34
+
} else {
35
+
None
36
+
};
37
+
return Ok(FrameHeader::Message(t));
38
+
}
39
+
-1 => return Ok(FrameHeader::Error),
40
+
_ => {}
41
+
}
42
+
}
43
+
}
44
+
Err(anyhow!("invalid frame type"))
45
+
}
46
+
}
47
+
48
+
/// Frame types for parsed WebSocket messages
49
+
#[derive(Debug, Clone, PartialEq, Eq)]
50
+
pub enum Frame {
51
+
Message(Option<String>, MessageFrame),
52
+
Error(ErrorFrame),
53
+
}
54
+
55
+
#[derive(Debug, Clone, PartialEq, Eq)]
56
+
pub struct MessageFrame {
57
+
pub body: Vec<u8>,
58
+
}
59
+
60
+
#[derive(Debug, Clone, PartialEq, Eq)]
61
+
pub struct ErrorFrame {}
62
+
63
+
impl TryFrom<&[u8]> for Frame {
64
+
type Error = anyhow::Error;
65
+
66
+
fn try_from(value: &[u8]) -> Result<Self> {
67
+
let mut cursor = Cursor::new(value);
68
+
let (left, right) = match serde_ipld_dagcbor::from_reader::<Ipld, _>(&mut cursor) {
69
+
Err(serde_ipld_dagcbor::DecodeError::TrailingData) => {
70
+
value.split_at(cursor.position() as usize)
71
+
}
72
+
_ => {
73
+
return Err(anyhow!("invalid frame type"));
74
+
}
75
+
};
76
+
let header = FrameHeader::try_from(serde_ipld_dagcbor::from_slice::<Ipld>(left)?)?;
77
+
if let FrameHeader::Message(t) = &header {
78
+
Ok(Frame::Message(t.clone(), MessageFrame { body: right.to_vec() }))
79
+
} else {
80
+
Ok(Frame::Error(ErrorFrame {}))
81
+
}
82
+
}
83
+
}
84
+
85
+
/// Subscribe to a repo's firehose and update inodes on changes
86
+
pub async fn subscribe_to_repo<R>(
87
+
did: String,
88
+
pds: String,
89
+
inodes: Arc<Mutex<IndexSet<PdsFsEntry>>>,
90
+
sizes: Arc<Mutex<IndexMap<usize, u64>>>,
91
+
content_cache: Arc<Mutex<IndexMap<String, String>>>,
92
+
notifier: fuser::Notifier,
93
+
) -> Result<()>
94
+
where
95
+
R: atrium_repo::blockstore::AsyncBlockStoreRead,
96
+
{
97
+
// Strip https:// or http:// prefix from PDS URL if present
98
+
let pds_host = pds.trim_start_matches("https://").trim_start_matches("http://");
99
+
let url = format!("wss://{}/xrpc/{}", pds_host, NSID);
100
+
println!("Connecting to firehose: {}", url);
101
+
102
+
let (mut stream, _) = connect_async(url).await?;
103
+
println!("Connected to firehose for {}", did);
104
+
105
+
loop {
106
+
match stream.next().await {
107
+
Some(Ok(Message::Binary(data))) => {
108
+
if let Ok(Frame::Message(Some(t), msg)) = Frame::try_from(data.as_slice()) {
109
+
if t.as_str() == "#commit" {
110
+
if let Ok(commit) = serde_ipld_dagcbor::from_reader::<Commit, _>(msg.body.as_slice()) {
111
+
// Only process commits for our DID
112
+
if commit.repo.as_str() == did {
113
+
if let Err(e) = handle_commit(&commit, &inodes, &sizes, &content_cache, &did, &pds, ¬ifier).await {
114
+
eprintln!("Error handling commit: {:?}", e);
115
+
}
116
+
}
117
+
}
118
+
}
119
+
}
120
+
}
121
+
Some(Ok(_)) => {} // Ignore other message types
122
+
Some(Err(e)) => {
123
+
eprintln!("WebSocket error: {}", e);
124
+
break;
125
+
}
126
+
None => {
127
+
eprintln!("WebSocket closed");
128
+
break;
129
+
}
130
+
}
131
+
}
132
+
133
+
Ok(())
134
+
}
135
+
136
+
/// Handle a commit by updating the inode tree and notifying Finder
137
+
async fn handle_commit(
138
+
commit: &Commit,
139
+
inodes: &Arc<Mutex<IndexSet<PdsFsEntry>>>,
140
+
sizes: &Arc<Mutex<IndexMap<usize, u64>>>,
141
+
content_cache: &Arc<Mutex<IndexMap<String, String>>>,
142
+
did: &str,
143
+
pds: &str,
144
+
notifier: &fuser::Notifier,
145
+
) -> Result<()> {
146
+
// Find the DID inode
147
+
let did_entry = PdsFsEntry::Did(did.to_string());
148
+
let did_inode = {
149
+
let inodes_lock = inodes.lock().unwrap();
150
+
inodes_lock.get_index_of(&did_entry)
151
+
};
152
+
153
+
let Some(did_inode) = did_inode else {
154
+
return Err(anyhow!("DID not found in inodes"));
155
+
};
156
+
157
+
for op in &commit.ops {
158
+
let Some((collection, rkey)) = op.path.split_once('/') else {
159
+
continue;
160
+
};
161
+
162
+
match op.action.as_str() {
163
+
"create" => {
164
+
// Fetch the record from PDS
165
+
let record_key = format!("{}/{}", collection, rkey);
166
+
let cache_key = format!("{}/{}", did, record_key);
167
+
168
+
// Fetch record content from PDS
169
+
match fetch_record(pds, did, collection, rkey).await {
170
+
Ok(content) => {
171
+
let content_len = content.len() as u64;
172
+
173
+
// Add the record to inodes
174
+
let (collection_inode, record_inode) = {
175
+
let mut inodes_lock = inodes.lock().unwrap();
176
+
177
+
// Ensure collection exists
178
+
let collection_entry = PdsFsEntry::Collection(PdsFsCollection {
179
+
parent: did_inode,
180
+
nsid: collection.to_string(),
181
+
});
182
+
let (collection_inode, _) = inodes_lock.insert_full(collection_entry);
183
+
184
+
// Add the record
185
+
let record_entry = PdsFsEntry::Record(PdsFsRecord {
186
+
parent: collection_inode,
187
+
rkey: rkey.to_string(),
188
+
});
189
+
let (record_inode, _) = inodes_lock.insert_full(record_entry);
190
+
(collection_inode, record_inode)
191
+
};
192
+
193
+
// Cache the content and size
194
+
content_cache.lock().unwrap().insert(cache_key, content);
195
+
sizes.lock().unwrap().insert(record_inode, content_len);
196
+
197
+
// Notify Finder about the new file (release lock first)
198
+
let filename = format!("{}.json", rkey);
199
+
if let Err(e) = notifier.inval_entry(collection_inode as u64, filename.as_ref()) {
200
+
eprintln!("Failed to invalidate entry for {}: {}", filename, e);
201
+
}
202
+
203
+
println!("Created: {}/{}", collection, rkey);
204
+
}
205
+
Err(e) => {
206
+
eprintln!("Failed to fetch record {}/{}: {}", collection, rkey, e);
207
+
}
208
+
}
209
+
}
210
+
"delete" => {
211
+
// Get inodes before removing
212
+
let (collection_inode_opt, child_inode_opt) = {
213
+
let mut inodes_lock = inodes.lock().unwrap();
214
+
215
+
// Find the collection
216
+
let collection_entry = PdsFsEntry::Collection(PdsFsCollection {
217
+
parent: did_inode,
218
+
nsid: collection.to_string(),
219
+
});
220
+
let collection_inode = inodes_lock.get_index_of(&collection_entry);
221
+
222
+
// Find and remove the record
223
+
let child_inode = if let Some(coll_ino) = collection_inode {
224
+
let record_entry = PdsFsEntry::Record(PdsFsRecord {
225
+
parent: coll_ino,
226
+
rkey: rkey.to_string(),
227
+
});
228
+
let child_ino = inodes_lock.get_index_of(&record_entry);
229
+
inodes_lock.shift_remove(&record_entry);
230
+
child_ino
231
+
} else {
232
+
None
233
+
};
234
+
235
+
(collection_inode, child_inode)
236
+
};
237
+
238
+
// Notify Finder about the deletion (release lock first)
239
+
if let (Some(coll_ino), Some(child_ino)) = (collection_inode_opt, child_inode_opt) {
240
+
// Remove from caches
241
+
sizes.lock().unwrap().shift_remove(&child_ino);
242
+
let cache_key = format!("{}/{}/{}", did, collection, rkey);
243
+
content_cache.lock().unwrap().shift_remove(&cache_key);
244
+
245
+
let filename = format!("{}.json", rkey);
246
+
if let Err(e) = notifier.delete(coll_ino as u64, child_ino as u64, filename.as_ref()) {
247
+
eprintln!("Failed to notify deletion for {}: {}", filename, e);
248
+
}
249
+
}
250
+
251
+
println!("Deleted: {}/{}", collection, rkey);
252
+
}
253
+
"update" => {
254
+
// For updates, invalidate the inode so content is re-fetched
255
+
let record_inode_opt = {
256
+
let inodes_lock = inodes.lock().unwrap();
257
+
let collection_entry = PdsFsEntry::Collection(PdsFsCollection {
258
+
parent: did_inode,
259
+
nsid: collection.to_string(),
260
+
});
261
+
262
+
if let Some(collection_inode) = inodes_lock.get_index_of(&collection_entry) {
263
+
let record_entry = PdsFsEntry::Record(PdsFsRecord {
264
+
parent: collection_inode,
265
+
rkey: rkey.to_string(),
266
+
});
267
+
inodes_lock.get_index_of(&record_entry)
268
+
} else {
269
+
None
270
+
}
271
+
};
272
+
273
+
// Notify Finder to invalidate the inode (release lock first)
274
+
if let Some(record_ino) = record_inode_opt {
275
+
// Clear caches so content is recalculated
276
+
sizes.lock().unwrap().shift_remove(&record_ino);
277
+
let cache_key = format!("{}/{}/{}", did, collection, rkey);
278
+
content_cache.lock().unwrap().shift_remove(&cache_key);
279
+
280
+
// Invalidate the entire inode (metadata and all data)
281
+
if let Err(e) = notifier.inval_inode(record_ino as u64, 0, 0) {
282
+
eprintln!("Failed to invalidate inode for {}/{}: {}", collection, rkey, e);
283
+
}
284
+
}
285
+
286
+
println!("Updated: {}/{}", collection, rkey);
287
+
}
288
+
_ => {}
289
+
}
290
+
}
291
+
292
+
Ok(())
293
+
}
294
+
295
+
/// Fetch a record from the PDS
296
+
async fn fetch_record(pds: &str, did: &str, collection: &str, rkey: &str) -> Result<String> {
297
+
let client = AtpServiceClient::new(IsahcClient::new(pds));
298
+
let did = types::string::Did::new(did.to_string()).map_err(|e| anyhow!(e))?;
299
+
let collection_nsid = types::string::Nsid::new(collection.to_string()).map_err(|e| anyhow!(e))?;
300
+
let record_key = types::string::RecordKey::new(rkey.to_string()).map_err(|e| anyhow!(e))?;
301
+
302
+
let response = client
303
+
.service
304
+
.com
305
+
.atproto
306
+
.repo
307
+
.get_record(com::atproto::repo::get_record::Parameters::from(
308
+
com::atproto::repo::get_record::ParametersData {
309
+
cid: None,
310
+
collection: collection_nsid,
311
+
repo: types::string::AtIdentifier::Did(did),
312
+
rkey: record_key,
313
+
}
314
+
))
315
+
.await?;
316
+
317
+
Ok(serde_json::to_string_pretty(&response.value)?)
318
+
}
+154
-64
src/fs.rs
+154
-64
src/fs.rs
···
1
-
use std::time;
1
+
use std::sync::{Arc, Mutex};
2
+
use std::time::{self, SystemTime, UNIX_EPOCH, Duration};
2
3
3
4
use atrium_repo::{Repository, blockstore::AsyncBlockStoreRead};
4
5
use futures::StreamExt;
···
6
7
7
8
type Inode = usize;
8
9
10
+
/// Decode a TID (timestamp identifier) to get the timestamp in microseconds since Unix epoch
11
+
fn tid_to_timestamp(tid: &str) -> Option<SystemTime> {
12
+
const S32_CHAR: &[u8] = b"234567abcdefghijklmnopqrstuvwxyz";
13
+
14
+
if tid.len() != 13 {
15
+
return None;
16
+
}
17
+
18
+
let mut value: u64 = 0;
19
+
for ch in tid.chars() {
20
+
let pos = S32_CHAR.iter().position(|&c| c as char == ch)?;
21
+
// Big-endian: first character is most significant
22
+
value = (value << 5) | (pos as u64);
23
+
}
24
+
25
+
// Extract timestamp from upper bits (shifted by 10)
26
+
let micros = value >> 10;
27
+
28
+
UNIX_EPOCH.checked_add(Duration::from_micros(micros))
29
+
}
30
+
9
31
pub struct PdsFs<R> {
10
-
repos: IndexMap<String, Repository<R>>,
11
-
inodes: IndexSet<PdsFsEntry>,
32
+
repos: Arc<Mutex<IndexMap<String, Repository<R>>>>,
33
+
inodes: Arc<Mutex<IndexSet<PdsFsEntry>>>,
34
+
sizes: Arc<Mutex<IndexMap<Inode, u64>>>,
35
+
content_cache: Arc<Mutex<IndexMap<String, String>>>,
36
+
rt: tokio::runtime::Runtime,
12
37
}
13
38
14
39
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
···
46
71
47
72
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
48
73
pub struct PdsFsCollection {
49
-
parent: Inode,
50
-
nsid: String,
74
+
pub parent: Inode,
75
+
pub nsid: String,
51
76
}
52
77
53
78
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
54
79
pub struct PdsFsRecord {
55
-
parent: Inode,
56
-
rkey: String,
80
+
pub parent: Inode,
81
+
pub rkey: String,
57
82
}
58
83
59
84
// impl PdsFsRecord {
···
89
114
{
90
115
pub fn new() -> Self {
91
116
PdsFs {
92
-
repos: Default::default(),
93
-
inodes: IndexSet::from([PdsFsEntry::Zero, PdsFsEntry::Root]),
117
+
repos: Arc::new(Mutex::new(Default::default())),
118
+
inodes: Arc::new(Mutex::new(IndexSet::from([PdsFsEntry::Zero, PdsFsEntry::Root]))),
119
+
sizes: Arc::new(Mutex::new(Default::default())),
120
+
content_cache: Arc::new(Mutex::new(Default::default())),
121
+
rt: tokio::runtime::Runtime::new().unwrap(),
94
122
}
95
123
}
96
124
125
+
pub fn get_shared_state(&self) -> (Arc<Mutex<IndexMap<String, Repository<R>>>>, Arc<Mutex<IndexSet<PdsFsEntry>>>, Arc<Mutex<IndexMap<Inode, u64>>>, Arc<Mutex<IndexMap<String, String>>>) {
126
+
(Arc::clone(&self.repos), Arc::clone(&self.inodes), Arc::clone(&self.sizes), Arc::clone(&self.content_cache))
127
+
}
128
+
97
129
pub async fn add(&mut self, did: String, mut repo: Repository<R>) {
98
130
let mut mst = repo.tree();
99
131
100
-
let (did_inode, _) = self.inodes.insert_full(PdsFsEntry::Did(did.clone()));
132
+
let did_inode = {
133
+
let mut inodes = self.inodes.lock().unwrap();
134
+
let (did_inode, _) = inodes.insert_full(PdsFsEntry::Did(did.clone()));
135
+
did_inode
136
+
};
101
137
102
138
let mut keys = Box::pin(mst.keys());
103
139
while let Some(Ok(key)) = keys.next().await {
104
140
if let Some((collection_name, rkey)) = key.split_once("/") {
105
-
let (collection_inode, _) =
106
-
self.inodes
107
-
.insert_full(PdsFsEntry::Collection(PdsFsCollection {
108
-
parent: did_inode,
109
-
nsid: collection_name.to_owned(),
110
-
}));
141
+
let mut inodes = self.inodes.lock().unwrap();
142
+
let (collection_inode, _) = inodes.insert_full(PdsFsEntry::Collection(PdsFsCollection {
143
+
parent: did_inode,
144
+
nsid: collection_name.to_owned(),
145
+
}));
111
146
112
-
self.inodes.insert(PdsFsEntry::Record(PdsFsRecord {
147
+
inodes.insert(PdsFsEntry::Record(PdsFsRecord {
113
148
parent: collection_inode,
114
149
rkey: rkey.to_owned(),
115
150
}));
···
119
154
drop(keys);
120
155
drop(mst);
121
156
122
-
self.repos.insert(did, repo);
157
+
self.repos.lock().unwrap().insert(did, repo);
123
158
}
124
159
125
160
fn attr(&mut self, ino: u64) -> fuser::FileAttr {
126
-
match self.inodes.get_index(ino as usize) {
161
+
let inodes = self.inodes.lock().unwrap();
162
+
match inodes.get_index(ino as usize) {
127
163
Some(PdsFsEntry::Root) => ROOTDIR_ATTR,
128
164
Some(PdsFsEntry::Collection(_)) => fuser::FileAttr {
129
165
ino,
···
160
196
blksize: BLKSIZE,
161
197
},
162
198
Some(PdsFsEntry::Record(r)) => {
163
-
let col = self.inodes[r.parent].unwrap_collection();
164
-
let did = self.inodes[col.parent].unwrap_did();
165
-
let repo = &mut self.repos[did];
166
-
let key = format!("{}/{}", col.nsid, r.rkey);
167
-
let rt = tokio::runtime::Runtime::new().unwrap();
168
-
let size = rt
169
-
.block_on(repo.get_raw::<ipld_core::ipld::Ipld>(&key))
170
-
.ok()
171
-
.flatten()
172
-
.map_or(0, |v| serde_json::to_string(&v).unwrap().len())
173
-
as u64;
199
+
let col = inodes[r.parent].unwrap_collection();
200
+
let did = inodes[col.parent].unwrap_did().clone();
201
+
let rkey = r.rkey.clone();
202
+
let collection_nsid = col.nsid.clone();
203
+
drop(inodes);
204
+
205
+
// Check cache first
206
+
let size = {
207
+
let sizes = self.sizes.lock().unwrap();
208
+
if let Some(&cached_size) = sizes.get(&(ino as usize)) {
209
+
cached_size
210
+
} else {
211
+
drop(sizes);
212
+
// Not in cache, try to fetch from repo
213
+
let mut repos = self.repos.lock().unwrap();
214
+
let repo = &mut repos[&did];
215
+
let key = format!("{}/{}", collection_nsid, rkey);
216
+
let size = self
217
+
.rt
218
+
.block_on(repo.get_raw::<ipld_core::ipld::Ipld>(&key))
219
+
.ok()
220
+
.flatten()
221
+
.map_or(500, |v| serde_json::to_string_pretty(&v).unwrap().len())
222
+
as u64;
223
+
// Cache it for next time
224
+
self.sizes.lock().unwrap().insert(ino as usize, size);
225
+
size
226
+
}
227
+
};
174
228
let blocks = ((size as u32 + BLKSIZE - 1) / BLKSIZE) as u64;
229
+
230
+
// Decode TID to get creation timestamp
231
+
let timestamp = tid_to_timestamp(&rkey).unwrap_or(time::UNIX_EPOCH);
232
+
175
233
fuser::FileAttr {
176
234
ino,
177
235
size,
178
236
blocks,
179
-
atime: time::UNIX_EPOCH,
180
-
mtime: time::UNIX_EPOCH,
181
-
ctime: time::UNIX_EPOCH,
182
-
crtime: time::UNIX_EPOCH,
237
+
atime: timestamp,
238
+
mtime: timestamp,
239
+
ctime: timestamp,
240
+
crtime: timestamp,
183
241
kind: fuser::FileType::RegularFile,
184
242
perm: 0o644,
185
243
nlink: 1,
···
187
245
gid: 20,
188
246
rdev: 0,
189
247
flags: 0,
190
-
blksize: 512,
248
+
blksize: BLKSIZE,
191
249
}
192
250
}
193
251
_ => panic!("zero"),
···
206
264
_fh: Option<u64>,
207
265
reply: fuser::ReplyAttr,
208
266
) {
209
-
if (ino as usize) < self.inodes.len() {
267
+
let len = self.inodes.lock().unwrap().len();
268
+
if (ino as usize) < len {
210
269
reply.attr(&TTL, &self.attr(ino as u64))
211
270
} else {
212
271
reply.error(libc::ENOENT)
···
221
280
offset: i64,
222
281
mut reply: fuser::ReplyDirectory,
223
282
) {
224
-
match self.inodes.get_index(ino as usize) {
283
+
let inodes = self.inodes.lock().unwrap();
284
+
match inodes.get_index(ino as usize) {
225
285
Some(PdsFsEntry::Root) => {
226
286
let entries: Vec<_> = vec![(ino, ".".to_string()), (ino, "..".to_string())]
227
287
.into_iter()
228
-
.chain(self.inodes.iter().enumerate().filter_map(|(i, e)| {
288
+
.chain(inodes.iter().enumerate().filter_map(|(i, e)| {
229
289
if let PdsFsEntry::Did(did) = e {
230
290
Some((i as u64, did.clone()))
231
291
} else {
···
233
293
}
234
294
}))
235
295
.collect();
296
+
drop(inodes);
236
297
237
298
for (index, (inode_num, name)) in
238
299
entries.into_iter().enumerate().skip(offset as usize)
···
249
310
reply.ok()
250
311
}
251
312
Some(PdsFsEntry::Did(_)) => {
252
-
let entries = vec![(ino, ".".to_string()), (1, "..".to_string())]
313
+
let entries: Vec<_> = vec![(ino, ".".to_string()), (1, "..".to_string())]
253
314
.into_iter()
254
-
.chain(self.inodes.iter().enumerate().filter_map(|(i, e)| {
315
+
.chain(inodes.iter().enumerate().filter_map(|(i, e)| {
255
316
if let PdsFsEntry::Collection(col) = e {
256
317
if col.parent == ino as usize {
257
318
Some((i as u64, col.nsid.clone()))
···
262
323
None
263
324
}
264
325
}))
265
-
.into_iter()
266
-
.enumerate()
267
-
.skip(offset as usize);
326
+
.collect();
327
+
drop(inodes);
268
328
269
-
for (index, (inode_num, name)) in entries {
329
+
for (index, (inode_num, name)) in entries.into_iter().enumerate().skip(offset as usize) {
270
330
let full = reply.add(
271
331
inode_num,
272
332
(index + 1) as i64,
···
285
345
reply.ok();
286
346
}
287
347
Some(PdsFsEntry::Collection(c)) => {
288
-
let entries = [(ino, ".".to_string()), (c.parent as u64, "..".to_string())]
348
+
let parent_ino = c.parent;
349
+
let entries: Vec<_> = [(ino, ".".to_string()), (parent_ino as u64, "..".to_string())]
289
350
.into_iter()
290
-
.chain(self.inodes.iter().enumerate().filter_map(|(i, e)| {
351
+
.chain(inodes.iter().enumerate().filter_map(|(i, e)| {
291
352
if let PdsFsEntry::Record(record) = e {
292
353
if record.parent == ino as usize {
293
-
Some((i as u64, record.rkey.clone()))
354
+
Some((i as u64, format!("{}.json", record.rkey)))
294
355
} else {
295
356
None
296
357
}
···
298
359
None
299
360
}
300
361
}))
301
-
.into_iter()
302
-
.enumerate()
303
-
.skip(offset as usize);
362
+
.collect();
363
+
drop(inodes);
304
364
305
-
for (index, (inode_num, name)) in entries {
365
+
for (index, (inode_num, name)) in entries.into_iter().enumerate().skip(offset as usize) {
306
366
let full = reply.add(
307
367
inode_num,
308
368
(index + 1) as i64,
···
320
380
321
381
reply.ok()
322
382
}
323
-
_ => reply.error(libc::ENOENT),
383
+
_ => {
384
+
drop(inodes);
385
+
reply.error(libc::ENOENT)
386
+
}
324
387
}
325
388
}
326
389
···
331
394
name: &std::ffi::OsStr,
332
395
reply: fuser::ReplyEntry,
333
396
) {
334
-
match self.inodes.get_index(parent as usize) {
397
+
let inodes = self.inodes.lock().unwrap();
398
+
match inodes.get_index(parent as usize) {
335
399
Some(PdsFsEntry::Root) => {
336
400
let did = PdsFsEntry::Did(name.to_string_lossy().to_string());
337
-
if let Some(ino) = self.inodes.get_index_of(&did) {
401
+
if let Some(ino) = inodes.get_index_of(&did) {
402
+
drop(inodes);
338
403
reply.entry(&TTL, &self.attr(ino as u64), 0);
339
404
} else {
405
+
drop(inodes);
340
406
reply.error(libc::ENOENT)
341
407
}
342
408
}
···
345
411
parent: parent as usize,
346
412
nsid: name.to_string_lossy().to_string(),
347
413
});
348
-
if let Some(ino) = self.inodes.get_index_of(&col) {
414
+
if let Some(ino) = inodes.get_index_of(&col) {
415
+
drop(inodes);
349
416
reply.entry(&TTL, &self.attr(ino as u64), 0);
350
417
} else {
418
+
drop(inodes);
351
419
reply.error(libc::ENOENT)
352
420
}
353
421
}
354
422
Some(PdsFsEntry::Collection(_)) => {
423
+
let name_str = name.to_string_lossy();
424
+
let rkey = name_str.strip_suffix(".json").unwrap_or(&name_str).to_string();
355
425
let record = PdsFsEntry::Record(PdsFsRecord {
356
426
parent: parent as usize,
357
-
rkey: name.to_string_lossy().to_string(),
427
+
rkey,
358
428
});
359
-
if let Some(ino) = self.inodes.get_index_of(&record) {
429
+
if let Some(ino) = inodes.get_index_of(&record) {
430
+
drop(inodes);
360
431
reply.entry(&TTL, &self.attr(ino as u64), 0);
361
432
} else {
433
+
drop(inodes);
362
434
reply.error(libc::ENOENT)
363
435
}
364
436
}
365
-
_ => reply.error(libc::ENOENT),
437
+
_ => {
438
+
drop(inodes);
439
+
reply.error(libc::ENOENT)
440
+
}
366
441
}
367
442
}
368
443
···
377
452
_lock: Option<u64>,
378
453
reply: fuser::ReplyData,
379
454
) {
380
-
let rt = tokio::runtime::Runtime::new().unwrap();
381
-
if let Some(PdsFsEntry::Record(r)) = self.inodes.get_index(ino as usize) {
382
-
let col = self.inodes[r.parent].unwrap_collection();
383
-
let did = self.inodes[col.parent].unwrap_did();
384
-
let repo = &mut self.repos[did];
455
+
let inodes = self.inodes.lock().unwrap();
456
+
if let Some(PdsFsEntry::Record(r)) = inodes.get_index(ino as usize) {
457
+
let col = inodes[r.parent].unwrap_collection();
458
+
let did = inodes[col.parent].unwrap_did().clone();
385
459
let key = format!("{}/{}", col.nsid, r.rkey);
460
+
let cache_key = format!("{}/{}", did, key);
461
+
drop(inodes);
386
462
387
-
if let Ok(Some(val)) = rt.block_on(repo.get_raw::<ipld_core::ipld::Ipld>(&key)) {
463
+
// Check content cache first (for new records from firehose)
464
+
{
465
+
let cache = self.content_cache.lock().unwrap();
466
+
if let Some(content) = cache.get(&cache_key) {
467
+
reply.data(&content.as_bytes()[offset as usize..]);
468
+
return;
469
+
}
470
+
}
471
+
472
+
// Fall back to repo
473
+
let mut repos = self.repos.lock().unwrap();
474
+
let repo = &mut repos[&did];
475
+
if let Ok(Some(val)) = self.rt.block_on(repo.get_raw::<ipld_core::ipld::Ipld>(&key)) {
388
476
reply.data(&serde_json::to_string(&val).unwrap().as_bytes()[offset as usize..]);
389
477
return;
390
478
}
479
+
} else {
480
+
drop(inodes);
391
481
}
392
482
reply.error(libc::ENOENT);
393
483
}
+52
-34
src/main.rs
+52
-34
src/main.rs
···
1
1
mod client;
2
2
mod error;
3
+
mod firehose;
3
4
mod fs;
4
5
mod resolver;
5
6
···
12
13
use futures::{StreamExt, stream};
13
14
use indicatif::{MultiProgress, ProgressBar, ProgressStyle};
14
15
use std::{
15
-
collections::HashMap,
16
16
io::{Cursor, Write},
17
17
path::PathBuf,
18
18
sync::Arc,
19
19
};
20
-
use xdg::BaseDirectories;
21
20
22
21
fn main() {
23
22
let rt = tokio::runtime::Runtime::new().unwrap();
···
59
58
let id = r.resolve(&h).await?;
60
59
let bytes = cached_download(&id, &b).await?;
61
60
let repo = build_repo(bytes).await?;
62
-
Ok::<_, error::Error>((id.did, repo))
61
+
Ok::<_, error::Error>((id.did.clone(), id.pds.clone(), repo))
63
62
}
64
63
})
65
64
.collect::<Vec<_>>(),
···
68
67
for e in errors {
69
68
eprintln!("{:?}", e.as_ref().unwrap_err());
70
69
}
71
-
let repos = success
70
+
let repos_with_pds: Vec<_> = success
72
71
.into_iter()
73
72
.map(|s| s.unwrap())
74
-
.collect::<HashMap<_, _>>();
73
+
.collect();
75
74
76
75
// construct the fs
77
76
let mut fs = fs::PdsFs::new();
78
-
for (did, repo) in repos {
77
+
78
+
// Extract (did, pds) pairs for WebSocket tasks before consuming repos
79
+
let did_pds_pairs: Vec<_> = repos_with_pds.iter()
80
+
.map(|(did, pds, _)| (did.clone(), pds.clone()))
81
+
.collect();
82
+
83
+
// Consume repos_with_pds to add repos to filesystem
84
+
for (did, _, repo) in repos_with_pds {
79
85
rt.block_on(fs.add(did, repo))
80
86
}
81
87
88
+
// get shared state for WebSocket tasks
89
+
let (_repos_arc, inodes_arc, sizes_arc, content_cache_arc) = fs.get_shared_state();
90
+
82
91
// mount
83
-
let options = vec![MountOption::RO, MountOption::FSName("pdsfs".to_string())];
84
-
let join_handle = fuser::spawn_mount2(fs, &mountpoint, &options).unwrap();
92
+
let options = vec![
93
+
MountOption::RO,
94
+
MountOption::FSName("pdsfs".to_string()),
95
+
MountOption::AllowOther,
96
+
MountOption::CUSTOM("local".to_string()),
97
+
MountOption::CUSTOM("volname=pdsfs".to_string()),
98
+
];
99
+
100
+
// Create session and get notifier for Finder refresh
101
+
let session = fuser::Session::new(fs, &mountpoint, &options).unwrap();
102
+
let notifier = session.notifier();
103
+
let _bg = session.spawn().unwrap();
104
+
105
+
// spawn WebSocket subscription tasks for each DID using the runtime handle
106
+
let rt_handle = rt.handle().clone();
107
+
for (did, pds) in did_pds_pairs {
108
+
let inodes_clone = Arc::clone(&inodes_arc);
109
+
let sizes_clone = Arc::clone(&sizes_arc);
110
+
let content_cache_clone = Arc::clone(&content_cache_arc);
111
+
let notifier_clone = notifier.clone();
112
+
113
+
rt_handle.spawn(async move {
114
+
if let Err(e) = firehose::subscribe_to_repo::<atrium_repo::blockstore::CarStore<std::io::Cursor<Vec<u8>>>>(
115
+
did,
116
+
pds,
117
+
inodes_clone,
118
+
sizes_clone,
119
+
content_cache_clone,
120
+
notifier_clone,
121
+
).await {
122
+
eprintln!("WebSocket error: {:?}", e);
123
+
}
124
+
});
125
+
}
85
126
86
127
println!("mounted at {mountpoint:?}");
87
128
print!("hit enter to unmount and exit...");
···
90
131
// Wait for user input
91
132
let mut input = String::new();
92
133
std::io::stdin().read_line(&mut input).unwrap();
93
-
94
-
join_handle.join();
95
-
std::fs::remove_dir(&mountpoint).unwrap();
96
134
97
135
println!("unmounted {mountpoint:?}");
98
136
}
···
111
149
pb.enable_steady_tick(std::time::Duration::from_millis(100));
112
150
pb = m.add(pb);
113
151
114
-
let dirs = BaseDirectories::new();
115
-
116
-
let dir = dirs
117
-
.get_cache_home()
118
-
.expect("$HOME is absent")
119
-
.join("pdsfs");
120
-
tokio::fs::create_dir_all(&dir).await?;
121
-
122
-
let file = dir.join(&id.did);
123
-
let exists = std::fs::exists(&file)?;
124
-
125
-
let bytes = if !exists {
126
-
pb.set_message(format!("downloading CAR file for...{}", id.did));
127
-
download_car_file(id, &pb).await?
128
-
} else {
129
-
pb.set_message(format!("using cached CAR file for...{}", id.did));
130
-
tokio::fs::read(&file).await?
131
-
};
132
-
133
-
// write to disk
134
-
if !exists {
135
-
tokio::fs::write(&file, &bytes).await?;
136
-
}
152
+
// Always download fresh - no caching for now to ensure up-to-date data
153
+
pb.set_message(format!("downloading CAR file for...{}", id.did));
154
+
let bytes = download_car_file(id, &pb).await?;
137
155
138
156
pb.finish();
139
157
Ok(bytes)