+43
-33
src/actor_store/blob_fs.rs
+43
-33
src/actor_store/blob_fs.rs
···
1
//! File system implementation of blob storage
2
//! Based on the S3 implementation but using local file system instead
3
use anyhow::Result;
4
-
use std::str::FromStr;
5
use cidv10::Cid;
6
use rsky_common::get_random_str;
7
use rsky_repo::error::BlobError;
8
use std::path::{Path, PathBuf};
9
use tokio::fs as async_fs;
10
use tokio::io::AsyncWriteExt;
11
use tracing::{debug, error, warn};
···
43
44
impl BlobStoreFs {
45
/// Create a new file system blob store for the given DID and base directory
46
-
pub fn new(did: String, base_dir: PathBuf) -> Self {
47
Self { base_dir, did }
48
}
49
50
/// Create a factory function for blob stores
51
-
pub fn creator(base_dir: PathBuf) -> Box<dyn Fn(String) -> BlobStoreFs> {
52
-
let base_dir_clone = base_dir.clone();
53
-
Box::new(move |did: String| BlobStoreFs::new(did, base_dir_clone.clone()))
54
}
55
56
/// Generate a random key for temporary storage
···
66
/// Get path to the stored blob with appropriate sharding
67
fn get_stored_path(&self, cid: Cid) -> PathBuf {
68
let cid_str = cid.to_string();
69
-
70
// Create two-level sharded structure based on CID
71
// First 10 chars for level 1, next 10 chars for level 2
72
let first_level = if cid_str.len() >= 10 {
···
74
} else {
75
&cid_str
76
};
77
-
78
let second_level = if cid_str.len() >= 20 {
79
&cid_str[10..20]
80
} else {
81
"default"
82
};
83
-
84
self.base_dir
85
.join("blocks")
86
.join(&self.did)
···
92
/// Get path to the quarantined blob
93
fn get_quarantined_path(&self, cid: Cid) -> PathBuf {
94
let cid_str = cid.to_string();
95
-
self.base_dir.join("quarantine").join(&self.did).join(&cid_str)
96
}
97
98
/// Store a blob temporarily
99
pub async fn put_temp(&self, bytes: Vec<u8>) -> Result<String> {
100
let key = self.gen_key();
101
let temp_path = self.get_tmp_path(&key);
102
-
103
// Ensure the directory exists
104
if let Some(parent) = temp_path.parent() {
105
async_fs::create_dir_all(parent).await?;
106
}
107
-
108
// Write the temporary blob
109
let mut file = async_fs::File::create(&temp_path).await?;
110
file.write_all(&bytes).await?;
111
file.flush().await?;
112
-
113
debug!("Stored temp blob at: {:?}", temp_path);
114
Ok(key)
115
}
···
117
/// Make a temporary blob permanent by moving it to the blob store
118
pub async fn make_permanent(&self, key: String, cid: Cid) -> Result<()> {
119
let already_has = self.has_stored(cid).await?;
120
-
121
if !already_has {
122
// Move the temporary blob to permanent storage
123
self.move_object(MoveObject {
124
from: self.get_tmp_path(&key),
125
to: self.get_stored_path(cid),
126
-
}).await?;
127
debug!("Moved temp blob to permanent: {} -> {}", key, cid);
128
} else {
129
// Already saved, so just delete the temp
···
133
debug!("Deleted temp blob as permanent already exists: {}", key);
134
}
135
}
136
-
137
Ok(())
138
}
139
140
/// Store a blob directly as permanent
141
pub async fn put_permanent(&self, cid: Cid, bytes: Vec<u8>) -> Result<()> {
142
let target_path = self.get_stored_path(cid);
143
-
144
// Ensure the directory exists
145
if let Some(parent) = target_path.parent() {
146
async_fs::create_dir_all(parent).await?;
147
}
148
-
149
// Write the blob
150
let mut file = async_fs::File::create(&target_path).await?;
151
file.write_all(&bytes).await?;
152
file.flush().await?;
153
-
154
debug!("Stored permanent blob: {}", cid);
155
Ok(())
156
}
···
160
self.move_object(MoveObject {
161
from: self.get_stored_path(cid),
162
to: self.get_quarantined_path(cid),
163
-
}).await?;
164
-
165
debug!("Quarantined blob: {}", cid);
166
Ok(())
167
}
···
171
self.move_object(MoveObject {
172
from: self.get_quarantined_path(cid),
173
to: self.get_stored_path(cid),
174
-
}).await?;
175
-
176
debug!("Unquarantined blob: {}", cid);
177
Ok(())
178
}
···
180
/// Get a blob as a stream
181
async fn get_object(&self, cid: Cid) -> Result<ByteStream> {
182
let blob_path = self.get_stored_path(cid);
183
-
184
match async_fs::read(&blob_path).await {
185
Ok(bytes) => Ok(ByteStream::new(bytes)),
186
Err(e) => {
···
215
/// Delete multiple blobs by CID
216
pub async fn delete_many(&self, cids: Vec<Cid>) -> Result<()> {
217
let mut futures = Vec::with_capacity(cids.len());
218
-
219
for cid in cids {
220
futures.push(self.delete_path(self.get_stored_path(cid)));
221
}
222
-
223
// Execute all delete operations concurrently
224
let results = futures::future::join_all(futures).await;
225
-
226
// Count errors but don't fail the operation
227
let error_count = results.iter().filter(|r| r.is_err()).count();
228
if error_count > 0 {
229
-
warn!("{} errors occurred while deleting {} blobs", error_count, results.len());
230
}
231
-
232
Ok(())
233
}
234
···
261
if !mov.from.exists() {
262
return Err(anyhow::Error::new(BlobError::BlobNotFoundError));
263
}
264
-
265
// Ensure the target directory exists
266
if let Some(parent) = mov.to.parent() {
267
async_fs::create_dir_all(parent).await?;
268
}
269
-
270
// Copy first, then delete source after success
271
-
async_fs::copy(&mov.from, &mov.to).await?;
272
async_fs::remove_file(&mov.from).await?;
273
-
274
debug!("Moved blob: {:?} -> {:?}", mov.from, mov.to);
275
Ok(())
276
}
277
-
}
···
1
//! File system implementation of blob storage
2
//! Based on the S3 implementation but using local file system instead
3
use anyhow::Result;
4
use cidv10::Cid;
5
use rsky_common::get_random_str;
6
use rsky_repo::error::BlobError;
7
use std::path::{Path, PathBuf};
8
+
use std::str::FromStr;
9
use tokio::fs as async_fs;
10
use tokio::io::AsyncWriteExt;
11
use tracing::{debug, error, warn};
···
43
44
impl BlobStoreFs {
45
/// Create a new file system blob store for the given DID and base directory
46
+
pub const fn new(did: String, base_dir: PathBuf) -> Self {
47
Self { base_dir, did }
48
}
49
50
/// Create a factory function for blob stores
51
+
pub fn creator(base_dir: PathBuf) -> Box<dyn Fn(String) -> Self> {
52
+
let base_dir_clone = base_dir;
53
+
Box::new(move |did: String| Self::new(did, base_dir_clone.clone()))
54
}
55
56
/// Generate a random key for temporary storage
···
66
/// Get path to the stored blob with appropriate sharding
67
fn get_stored_path(&self, cid: Cid) -> PathBuf {
68
let cid_str = cid.to_string();
69
+
70
// Create two-level sharded structure based on CID
71
// First 10 chars for level 1, next 10 chars for level 2
72
let first_level = if cid_str.len() >= 10 {
···
74
} else {
75
&cid_str
76
};
77
+
78
let second_level = if cid_str.len() >= 20 {
79
&cid_str[10..20]
80
} else {
81
"default"
82
};
83
+
84
self.base_dir
85
.join("blocks")
86
.join(&self.did)
···
92
/// Get path to the quarantined blob
93
fn get_quarantined_path(&self, cid: Cid) -> PathBuf {
94
let cid_str = cid.to_string();
95
+
self.base_dir
96
+
.join("quarantine")
97
+
.join(&self.did)
98
+
.join(&cid_str)
99
}
100
101
/// Store a blob temporarily
102
pub async fn put_temp(&self, bytes: Vec<u8>) -> Result<String> {
103
let key = self.gen_key();
104
let temp_path = self.get_tmp_path(&key);
105
+
106
// Ensure the directory exists
107
if let Some(parent) = temp_path.parent() {
108
async_fs::create_dir_all(parent).await?;
109
}
110
+
111
// Write the temporary blob
112
let mut file = async_fs::File::create(&temp_path).await?;
113
file.write_all(&bytes).await?;
114
file.flush().await?;
115
+
116
debug!("Stored temp blob at: {:?}", temp_path);
117
Ok(key)
118
}
···
120
/// Make a temporary blob permanent by moving it to the blob store
121
pub async fn make_permanent(&self, key: String, cid: Cid) -> Result<()> {
122
let already_has = self.has_stored(cid).await?;
123
+
124
if !already_has {
125
// Move the temporary blob to permanent storage
126
self.move_object(MoveObject {
127
from: self.get_tmp_path(&key),
128
to: self.get_stored_path(cid),
129
+
})
130
+
.await?;
131
debug!("Moved temp blob to permanent: {} -> {}", key, cid);
132
} else {
133
// Already saved, so just delete the temp
···
137
debug!("Deleted temp blob as permanent already exists: {}", key);
138
}
139
}
140
+
141
Ok(())
142
}
143
144
/// Store a blob directly as permanent
145
pub async fn put_permanent(&self, cid: Cid, bytes: Vec<u8>) -> Result<()> {
146
let target_path = self.get_stored_path(cid);
147
+
148
// Ensure the directory exists
149
if let Some(parent) = target_path.parent() {
150
async_fs::create_dir_all(parent).await?;
151
}
152
+
153
// Write the blob
154
let mut file = async_fs::File::create(&target_path).await?;
155
file.write_all(&bytes).await?;
156
file.flush().await?;
157
+
158
debug!("Stored permanent blob: {}", cid);
159
Ok(())
160
}
···
164
self.move_object(MoveObject {
165
from: self.get_stored_path(cid),
166
to: self.get_quarantined_path(cid),
167
+
})
168
+
.await?;
169
+
170
debug!("Quarantined blob: {}", cid);
171
Ok(())
172
}
···
176
self.move_object(MoveObject {
177
from: self.get_quarantined_path(cid),
178
to: self.get_stored_path(cid),
179
+
})
180
+
.await?;
181
+
182
debug!("Unquarantined blob: {}", cid);
183
Ok(())
184
}
···
186
/// Get a blob as a stream
187
async fn get_object(&self, cid: Cid) -> Result<ByteStream> {
188
let blob_path = self.get_stored_path(cid);
189
+
190
match async_fs::read(&blob_path).await {
191
Ok(bytes) => Ok(ByteStream::new(bytes)),
192
Err(e) => {
···
221
/// Delete multiple blobs by CID
222
pub async fn delete_many(&self, cids: Vec<Cid>) -> Result<()> {
223
let mut futures = Vec::with_capacity(cids.len());
224
+
225
for cid in cids {
226
futures.push(self.delete_path(self.get_stored_path(cid)));
227
}
228
+
229
// Execute all delete operations concurrently
230
let results = futures::future::join_all(futures).await;
231
+
232
// Count errors but don't fail the operation
233
let error_count = results.iter().filter(|r| r.is_err()).count();
234
if error_count > 0 {
235
+
warn!(
236
+
"{} errors occurred while deleting {} blobs",
237
+
error_count,
238
+
results.len()
239
+
);
240
}
241
+
242
Ok(())
243
}
244
···
271
if !mov.from.exists() {
272
return Err(anyhow::Error::new(BlobError::BlobNotFoundError));
273
}
274
+
275
// Ensure the target directory exists
276
if let Some(parent) = mov.to.parent() {
277
async_fs::create_dir_all(parent).await?;
278
}
279
+
280
// Copy first, then delete source after success
281
+
_ = async_fs::copy(&mov.from, &mov.to).await?;
282
async_fs::remove_file(&mov.from).await?;
283
+
284
debug!("Moved blob: {:?} -> {:?}", mov.from, mov.to);
285
Ok(())
286
}
287
+
}