+277
src/actor_store/blob_fs.rs
+277
src/actor_store/blob_fs.rs
···
1
+
//! File system implementation of blob storage
2
+
//! Based on the S3 implementation but using local file system instead
3
+
use anyhow::Result;
4
+
use std::str::FromStr;
5
+
use cidv10::Cid;
6
+
use rsky_common::get_random_str;
7
+
use rsky_repo::error::BlobError;
8
+
use std::path::{Path, PathBuf};
9
+
use tokio::fs as async_fs;
10
+
use tokio::io::AsyncWriteExt;
11
+
use tracing::{debug, error, warn};
12
+
13
+
/// ByteStream implementation for blob data
14
+
pub struct ByteStream {
15
+
pub bytes: Vec<u8>,
16
+
}
17
+
18
+
impl ByteStream {
19
+
/// Create a new ByteStream with the given bytes
20
+
pub const fn new(bytes: Vec<u8>) -> Self {
21
+
Self { bytes }
22
+
}
23
+
24
+
/// Collect the bytes from the stream
25
+
pub async fn collect(self) -> Result<Vec<u8>> {
26
+
Ok(self.bytes)
27
+
}
28
+
}
29
+
30
+
/// Path information for moving a blob
31
+
struct MoveObject {
32
+
from: PathBuf,
33
+
to: PathBuf,
34
+
}
35
+
36
+
/// File system implementation of blob storage
37
+
pub struct BlobStoreFs {
38
+
/// Base directory for storing blobs
39
+
pub base_dir: PathBuf,
40
+
/// DID of the actor
41
+
pub did: String,
42
+
}
43
+
44
+
impl BlobStoreFs {
45
+
/// Create a new file system blob store for the given DID and base directory
46
+
pub fn new(did: String, base_dir: PathBuf) -> Self {
47
+
Self { base_dir, did }
48
+
}
49
+
50
+
/// Create a factory function for blob stores
51
+
pub fn creator(base_dir: PathBuf) -> Box<dyn Fn(String) -> BlobStoreFs> {
52
+
let base_dir_clone = base_dir.clone();
53
+
Box::new(move |did: String| BlobStoreFs::new(did, base_dir_clone.clone()))
54
+
}
55
+
56
+
/// Generate a random key for temporary storage
57
+
fn gen_key(&self) -> String {
58
+
get_random_str()
59
+
}
60
+
61
+
/// Get path to the temporary blob storage
62
+
fn get_tmp_path(&self, key: &str) -> PathBuf {
63
+
self.base_dir.join("tmp").join(&self.did).join(key)
64
+
}
65
+
66
+
/// Get path to the stored blob with appropriate sharding
67
+
fn get_stored_path(&self, cid: Cid) -> PathBuf {
68
+
let cid_str = cid.to_string();
69
+
70
+
// Create two-level sharded structure based on CID
71
+
// First 10 chars for level 1, next 10 chars for level 2
72
+
let first_level = if cid_str.len() >= 10 {
73
+
&cid_str[0..10]
74
+
} else {
75
+
&cid_str
76
+
};
77
+
78
+
let second_level = if cid_str.len() >= 20 {
79
+
&cid_str[10..20]
80
+
} else {
81
+
"default"
82
+
};
83
+
84
+
self.base_dir
85
+
.join("blocks")
86
+
.join(&self.did)
87
+
.join(first_level)
88
+
.join(second_level)
89
+
.join(&cid_str)
90
+
}
91
+
92
+
/// Get path to the quarantined blob
93
+
fn get_quarantined_path(&self, cid: Cid) -> PathBuf {
94
+
let cid_str = cid.to_string();
95
+
self.base_dir.join("quarantine").join(&self.did).join(&cid_str)
96
+
}
97
+
98
+
/// Store a blob temporarily
99
+
pub async fn put_temp(&self, bytes: Vec<u8>) -> Result<String> {
100
+
let key = self.gen_key();
101
+
let temp_path = self.get_tmp_path(&key);
102
+
103
+
// Ensure the directory exists
104
+
if let Some(parent) = temp_path.parent() {
105
+
async_fs::create_dir_all(parent).await?;
106
+
}
107
+
108
+
// Write the temporary blob
109
+
let mut file = async_fs::File::create(&temp_path).await?;
110
+
file.write_all(&bytes).await?;
111
+
file.flush().await?;
112
+
113
+
debug!("Stored temp blob at: {:?}", temp_path);
114
+
Ok(key)
115
+
}
116
+
117
+
/// Make a temporary blob permanent by moving it to the blob store
118
+
pub async fn make_permanent(&self, key: String, cid: Cid) -> Result<()> {
119
+
let already_has = self.has_stored(cid).await?;
120
+
121
+
if !already_has {
122
+
// Move the temporary blob to permanent storage
123
+
self.move_object(MoveObject {
124
+
from: self.get_tmp_path(&key),
125
+
to: self.get_stored_path(cid),
126
+
}).await?;
127
+
debug!("Moved temp blob to permanent: {} -> {}", key, cid);
128
+
} else {
129
+
// Already saved, so just delete the temp
130
+
let temp_path = self.get_tmp_path(&key);
131
+
if temp_path.exists() {
132
+
async_fs::remove_file(temp_path).await?;
133
+
debug!("Deleted temp blob as permanent already exists: {}", key);
134
+
}
135
+
}
136
+
137
+
Ok(())
138
+
}
139
+
140
+
/// Store a blob directly as permanent
141
+
pub async fn put_permanent(&self, cid: Cid, bytes: Vec<u8>) -> Result<()> {
142
+
let target_path = self.get_stored_path(cid);
143
+
144
+
// Ensure the directory exists
145
+
if let Some(parent) = target_path.parent() {
146
+
async_fs::create_dir_all(parent).await?;
147
+
}
148
+
149
+
// Write the blob
150
+
let mut file = async_fs::File::create(&target_path).await?;
151
+
file.write_all(&bytes).await?;
152
+
file.flush().await?;
153
+
154
+
debug!("Stored permanent blob: {}", cid);
155
+
Ok(())
156
+
}
157
+
158
+
/// Quarantine a blob by moving it to the quarantine area
159
+
pub async fn quarantine(&self, cid: Cid) -> Result<()> {
160
+
self.move_object(MoveObject {
161
+
from: self.get_stored_path(cid),
162
+
to: self.get_quarantined_path(cid),
163
+
}).await?;
164
+
165
+
debug!("Quarantined blob: {}", cid);
166
+
Ok(())
167
+
}
168
+
169
+
/// Unquarantine a blob by moving it back to regular storage
170
+
pub async fn unquarantine(&self, cid: Cid) -> Result<()> {
171
+
self.move_object(MoveObject {
172
+
from: self.get_quarantined_path(cid),
173
+
to: self.get_stored_path(cid),
174
+
}).await?;
175
+
176
+
debug!("Unquarantined blob: {}", cid);
177
+
Ok(())
178
+
}
179
+
180
+
/// Get a blob as a stream
181
+
async fn get_object(&self, cid: Cid) -> Result<ByteStream> {
182
+
let blob_path = self.get_stored_path(cid);
183
+
184
+
match async_fs::read(&blob_path).await {
185
+
Ok(bytes) => Ok(ByteStream::new(bytes)),
186
+
Err(e) => {
187
+
error!("Failed to read blob at path {:?}: {}", blob_path, e);
188
+
Err(anyhow::Error::new(BlobError::BlobNotFoundError))
189
+
}
190
+
}
191
+
}
192
+
193
+
/// Get blob bytes
194
+
pub async fn get_bytes(&self, cid: Cid) -> Result<Vec<u8>> {
195
+
let stream = self.get_object(cid).await?;
196
+
stream.collect().await
197
+
}
198
+
199
+
/// Get a blob as a stream
200
+
pub async fn get_stream(&self, cid: Cid) -> Result<ByteStream> {
201
+
self.get_object(cid).await
202
+
}
203
+
204
+
/// Delete a blob by CID string
205
+
pub async fn delete(&self, cid_str: String) -> Result<()> {
206
+
match Cid::from_str(&cid_str) {
207
+
Ok(cid) => self.delete_path(self.get_stored_path(cid)).await,
208
+
Err(e) => {
209
+
warn!("Invalid CID: {} - {}", cid_str, e);
210
+
Err(anyhow::anyhow!("Invalid CID: {}", e))
211
+
}
212
+
}
213
+
}
214
+
215
+
/// Delete multiple blobs by CID
216
+
pub async fn delete_many(&self, cids: Vec<Cid>) -> Result<()> {
217
+
let mut futures = Vec::with_capacity(cids.len());
218
+
219
+
for cid in cids {
220
+
futures.push(self.delete_path(self.get_stored_path(cid)));
221
+
}
222
+
223
+
// Execute all delete operations concurrently
224
+
let results = futures::future::join_all(futures).await;
225
+
226
+
// Count errors but don't fail the operation
227
+
let error_count = results.iter().filter(|r| r.is_err()).count();
228
+
if error_count > 0 {
229
+
warn!("{} errors occurred while deleting {} blobs", error_count, results.len());
230
+
}
231
+
232
+
Ok(())
233
+
}
234
+
235
+
/// Check if a blob is stored in the regular storage
236
+
pub async fn has_stored(&self, cid: Cid) -> Result<bool> {
237
+
let blob_path = self.get_stored_path(cid);
238
+
Ok(blob_path.exists())
239
+
}
240
+
241
+
/// Check if a temporary blob exists
242
+
pub async fn has_temp(&self, key: String) -> Result<bool> {
243
+
let temp_path = self.get_tmp_path(&key);
244
+
Ok(temp_path.exists())
245
+
}
246
+
247
+
/// Helper function to delete a file at the given path
248
+
async fn delete_path(&self, path: PathBuf) -> Result<()> {
249
+
if path.exists() {
250
+
async_fs::remove_file(&path).await?;
251
+
debug!("Deleted file at: {:?}", path);
252
+
Ok(())
253
+
} else {
254
+
Err(anyhow::Error::new(BlobError::BlobNotFoundError))
255
+
}
256
+
}
257
+
258
+
/// Move a blob from one path to another
259
+
async fn move_object(&self, mov: MoveObject) -> Result<()> {
260
+
// Ensure the source exists
261
+
if !mov.from.exists() {
262
+
return Err(anyhow::Error::new(BlobError::BlobNotFoundError));
263
+
}
264
+
265
+
// Ensure the target directory exists
266
+
if let Some(parent) = mov.to.parent() {
267
+
async_fs::create_dir_all(parent).await?;
268
+
}
269
+
270
+
// Copy first, then delete source after success
271
+
async_fs::copy(&mov.from, &mov.to).await?;
272
+
async_fs::remove_file(&mov.from).await?;
273
+
274
+
debug!("Moved blob: {:?} -> {:?}", mov.from, mov.to);
275
+
Ok(())
276
+
}
277
+
}