+129
-326
src/actor_store/sql_blob.rs
+129
-326
src/actor_store/sql_blob.rs
···
3
3
clippy::single_char_lifetime_names,
4
4
unused_qualifications
5
5
)]
6
-
use std::{path::PathBuf, str::FromStr as _, sync::Arc};
7
-
8
6
use anyhow::{Context, Result};
9
7
use cidv10::Cid;
10
-
use diesel::*;
8
+
use diesel::prelude::*;
11
9
use rsky_common::get_random_str;
12
-
use tokio::fs;
10
+
use std::sync::Arc;
13
11
14
12
use crate::db::DbConn;
15
-
16
-
/// Type for stream of blob data
17
-
pub type BlobStream = Box<dyn std::io::Read + Send>;
18
13
19
14
/// ByteStream implementation for blob data
20
15
pub struct ByteStream {
···
38
33
pub db: Arc<DbConn>,
39
34
/// DID of the actor
40
35
pub did: String,
41
-
/// Path for blob storage
42
-
pub path: PathBuf,
43
-
}
44
-
45
-
/// Configuration for the blob store
46
-
pub struct BlobConfig {
47
-
/// Base path for blob storage
48
-
pub path: PathBuf,
49
-
}
50
-
51
-
/// Represents a move operation for blobs
52
-
struct MoveObject {
53
-
from: String,
54
-
to: String,
55
36
}
56
37
57
38
/// Blob table structure for SQL operations
···
60
41
struct BlobEntry {
61
42
cid: String,
62
43
did: String,
63
-
path: String,
44
+
data: Vec<u8>,
64
45
size: i32,
65
46
mime_type: String,
66
47
quarantined: bool,
67
48
temp: bool,
49
+
temp_key: Option<String>,
68
50
}
69
51
70
52
// Table definition for blobs
···
72
54
blobs (cid, did) {
73
55
cid -> Text,
74
56
did -> Text,
75
-
path -> Text,
57
+
data -> Binary,
76
58
size -> Integer,
77
59
mime_type -> Text,
78
60
quarantined -> Bool,
79
61
temp -> Bool,
62
+
temp_key -> Nullable<Text>,
80
63
}
81
64
}
82
65
83
66
impl BlobStoreSql {
84
67
/// Create a new SQL-based blob store for the given DID
85
-
pub fn new(did: String, cfg: &BlobConfig, db: Arc<DbConn>) -> Self {
86
-
let actor_path = cfg.path.join(&did);
87
-
88
-
// Create actor directory if it doesn't exist
89
-
if !actor_path.exists() {
90
-
// Use blocking to avoid complicating this constructor
91
-
std::fs::create_dir_all(&actor_path).unwrap_or_else(|_| {
92
-
panic!("Failed to create blob directory: {}", actor_path.display())
93
-
});
94
-
}
95
-
96
-
BlobStoreSql {
97
-
db,
98
-
did,
99
-
path: actor_path,
100
-
}
68
+
pub fn new(did: String, db: Arc<DbConn>) -> Self {
69
+
BlobStoreSql { db, did }
101
70
}
102
71
103
72
/// Create a factory function for blob stores
104
-
pub fn creator(cfg: &BlobConfig, db: Arc<DbConn>) -> Box<dyn Fn(String) -> BlobStoreSql + '_> {
73
+
pub fn creator(db: Arc<DbConn>) -> Box<dyn Fn(String) -> BlobStoreSql> {
105
74
let db_clone = db.clone();
106
-
Box::new(move |did: String| BlobStoreSql::new(did, cfg, db_clone.clone()))
75
+
Box::new(move |did: String| BlobStoreSql::new(did, db_clone.clone()))
107
76
}
108
77
109
78
/// Generate a random key for temporary blobs
···
111
80
get_random_str()
112
81
}
113
82
114
-
/// Get the path for a temporary blob
115
-
fn get_tmp_path(&self, key: &str) -> PathBuf {
116
-
self.path.join("tmp").join(key)
117
-
}
118
-
119
-
/// Get the filesystem path for a stored blob
120
-
fn get_stored_path(&self, cid: &Cid) -> PathBuf {
121
-
self.path.join("blocks").join(cid.to_string())
122
-
}
123
-
124
-
/// Get the filesystem path for a quarantined blob
125
-
fn get_quarantined_path(&self, cid: &Cid) -> PathBuf {
126
-
self.path.join("quarantine").join(cid.to_string())
127
-
}
128
-
129
83
/// Store a blob temporarily
130
84
pub async fn put_temp(&self, bytes: Vec<u8>) -> Result<String> {
131
85
let key = self.gen_key();
132
-
let tmp_path = self.get_tmp_path(&key);
133
-
134
-
// Ensure the directory exists
135
-
if let Some(parent) = tmp_path.parent() {
136
-
fs::create_dir_all(parent)
137
-
.await
138
-
.context("Failed to create temp directory")?;
139
-
}
140
-
141
-
// Write the blob data to the file
142
-
fs::write(&tmp_path, &bytes)
143
-
.await
144
-
.context("Failed to write temporary blob")?;
145
-
146
-
// Clone values to be used in the closure
147
86
let did_clone = self.did.clone();
148
-
let tmp_path_str = tmp_path.to_string_lossy().to_string();
149
87
let bytes_len = bytes.len() as i32;
150
88
151
-
// Store metadata in the database (will be updated when made permanent)
89
+
// Store in the database with temp flag
90
+
let key_clone = key.clone();
152
91
self.db
153
92
.run(move |conn| {
154
93
let entry = BlobEntry {
155
94
cid: "temp".to_string(), // Will be updated when made permanent
156
95
did: did_clone,
157
-
path: tmp_path_str,
96
+
data: bytes,
158
97
size: bytes_len,
159
98
mime_type: "application/octet-stream".to_string(), // Will be updated when made permanent
160
99
quarantined: false,
161
100
temp: true,
101
+
temp_key: Some(key_clone),
162
102
};
163
103
164
104
diesel::insert_into(blobs::table)
165
105
.values(&entry)
166
106
.execute(conn)
167
-
.context("Failed to insert temporary blob metadata")
107
+
.context("Failed to insert temporary blob data")
168
108
})
169
109
.await?;
170
110
···
175
115
pub async fn make_permanent(&self, key: String, cid: Cid) -> Result<()> {
176
116
let already_has = self.has_stored(cid).await?;
177
117
if !already_has {
178
-
let tmp_path = self.get_tmp_path(&key);
179
-
let stored_path = self.get_stored_path(&cid);
180
-
181
-
// Ensure parent directory exists
182
-
if let Some(parent) = stored_path.parent() {
183
-
fs::create_dir_all(parent)
184
-
.await
185
-
.context("Failed to create blocks directory")?;
186
-
}
187
-
188
-
// Read the bytes
189
-
let bytes = fs::read(&tmp_path)
190
-
.await
191
-
.context("Failed to read temporary blob")?;
192
-
193
-
// Write to permanent location
194
-
fs::write(&stored_path, &bytes)
195
-
.await
196
-
.context("Failed to write permanent blob")?;
118
+
let cid_str = cid.to_string();
119
+
let did_clone = self.did.clone();
197
120
198
-
// Update database metadata
199
-
let tmp_path_clone = tmp_path.clone();
121
+
// Update database record to make it permanent
200
122
self.db
201
123
.run(move |conn| {
202
-
// Update the entry with the correct CID and path
203
124
diesel::update(blobs::table)
204
-
.filter(blobs::path.eq(tmp_path_clone.to_string_lossy().to_string()))
125
+
.filter(blobs::temp_key.eq(&key))
126
+
.filter(blobs::did.eq(&did_clone))
205
127
.set((
206
-
blobs::cid.eq(cid.to_string()),
207
-
blobs::path.eq(stored_path.to_string_lossy().to_string()),
128
+
blobs::cid.eq(&cid_str),
208
129
blobs::temp.eq(false),
130
+
blobs::temp_key.eq::<Option<String>>(None),
209
131
))
210
132
.execute(conn)
211
-
.context("Failed to update blob metadata")
133
+
.context("Failed to update blob to permanent status")
212
134
})
213
135
.await?;
214
-
215
-
// Remove the temporary file
216
-
fs::remove_file(tmp_path)
217
-
.await
218
-
.context("Failed to remove temporary blob")?;
219
136
220
137
Ok(())
221
138
} else {
222
-
// Already saved, so delete the temp file
223
-
let tmp_path = self.get_tmp_path(&key);
224
-
if tmp_path.exists() {
225
-
fs::remove_file(tmp_path)
226
-
.await
227
-
.context("Failed to remove existing temporary blob")?;
228
-
}
139
+
// Already exists, so delete the temporary one
140
+
let did_clone = self.did.clone();
141
+
142
+
self.db
143
+
.run(move |conn| {
144
+
diesel::delete(blobs::table)
145
+
.filter(blobs::temp_key.eq(&key))
146
+
.filter(blobs::did.eq(&did_clone))
147
+
.execute(conn)
148
+
.context("Failed to delete redundant temporary blob")
149
+
})
150
+
.await?;
151
+
229
152
Ok(())
230
153
}
231
154
}
232
155
233
156
/// Store a blob directly as permanent
234
157
pub async fn put_permanent(&self, cid: Cid, bytes: Vec<u8>) -> Result<()> {
235
-
let stored_path = self.get_stored_path(&cid);
236
-
237
-
// Ensure parent directory exists
238
-
if let Some(parent) = stored_path.parent() {
239
-
fs::create_dir_all(parent)
240
-
.await
241
-
.context("Failed to create blocks directory")?;
242
-
}
243
-
244
-
// Write to permanent location
245
-
fs::write(&stored_path, &bytes)
246
-
.await
247
-
.context("Failed to write permanent blob")?;
248
-
249
-
let stored_path_str = stored_path.to_string_lossy().to_string();
250
158
let cid_str = cid.to_string();
251
159
let did_clone = self.did.clone();
252
160
let bytes_len = bytes.len() as i32;
253
161
254
-
// Update database metadata
162
+
// Store directly in the database
255
163
self.db
256
164
.run(move |conn| {
165
+
let data_clone = bytes.clone();
257
166
let entry = BlobEntry {
258
-
cid: cid_str,
259
-
did: did_clone,
260
-
path: stored_path_str.clone(),
167
+
cid: cid_str.clone(),
168
+
did: did_clone.clone(),
169
+
data: bytes,
261
170
size: bytes_len,
262
171
mime_type: "application/octet-stream".to_string(), // Could be improved with MIME detection
263
172
quarantined: false,
264
173
temp: false,
174
+
temp_key: None,
265
175
};
266
176
267
177
diesel::insert_into(blobs::table)
268
178
.values(&entry)
269
179
.on_conflict((blobs::cid, blobs::did))
270
180
.do_update()
271
-
.set(blobs::path.eq(stored_path_str))
181
+
.set(blobs::data.eq(data_clone))
272
182
.execute(conn)
273
-
.context("Failed to insert permanent blob metadata")
183
+
.context("Failed to insert permanent blob data")
274
184
})
275
185
.await?;
276
186
···
279
189
280
190
/// Quarantine a blob
281
191
pub async fn quarantine(&self, cid: Cid) -> Result<()> {
282
-
let stored_path = self.get_stored_path(&cid);
283
-
let quarantined_path = self.get_quarantined_path(&cid);
192
+
let cid_str = cid.to_string();
193
+
let did_clone = self.did.clone();
284
194
285
-
// Ensure parent directory exists
286
-
if let Some(parent) = quarantined_path.parent() {
287
-
fs::create_dir_all(parent)
288
-
.await
289
-
.context("Failed to create quarantine directory")?;
290
-
}
291
-
292
-
// Move the blob if it exists
293
-
if stored_path.exists() {
294
-
// Read the bytes
295
-
let bytes = fs::read(&stored_path)
296
-
.await
297
-
.context("Failed to read stored blob")?;
298
-
299
-
// Write to quarantine location
300
-
fs::write(&quarantined_path, &bytes)
301
-
.await
302
-
.context("Failed to write quarantined blob")?;
303
-
304
-
// Update database metadata
305
-
let cid_str = cid.to_string();
306
-
let did_clone = self.did.clone();
307
-
let quarantined_path_str = quarantined_path.to_string_lossy().to_string();
308
-
309
-
self.db
310
-
.run(move |conn| {
311
-
diesel::update(blobs::table)
312
-
.filter(blobs::cid.eq(cid_str))
313
-
.filter(blobs::did.eq(did_clone))
314
-
.set((
315
-
blobs::path.eq(quarantined_path_str),
316
-
blobs::quarantined.eq(true),
317
-
))
318
-
.execute(conn)
319
-
.context("Failed to update blob metadata for quarantine")
320
-
})
321
-
.await?;
322
-
323
-
// Remove the original file
324
-
fs::remove_file(stored_path)
325
-
.await
326
-
.context("Failed to remove quarantined blob")?;
327
-
}
195
+
// Update the quarantine flag in the database
196
+
self.db
197
+
.run(move |conn| {
198
+
diesel::update(blobs::table)
199
+
.filter(blobs::cid.eq(&cid_str))
200
+
.filter(blobs::did.eq(&did_clone))
201
+
.set(blobs::quarantined.eq(true))
202
+
.execute(conn)
203
+
.context("Failed to quarantine blob")
204
+
})
205
+
.await?;
328
206
329
207
Ok(())
330
208
}
331
209
332
210
/// Unquarantine a blob
333
211
pub async fn unquarantine(&self, cid: Cid) -> Result<()> {
334
-
let quarantined_path = self.get_quarantined_path(&cid);
335
-
let stored_path = self.get_stored_path(&cid);
212
+
let cid_str = cid.to_string();
213
+
let did_clone = self.did.clone();
336
214
337
-
// Ensure parent directory exists
338
-
if let Some(parent) = stored_path.parent() {
339
-
fs::create_dir_all(parent)
340
-
.await
341
-
.context("Failed to create blocks directory")?;
342
-
}
343
-
344
-
// Move the blob if it exists
345
-
if quarantined_path.exists() {
346
-
// Read the bytes
347
-
let bytes = fs::read(&quarantined_path)
348
-
.await
349
-
.context("Failed to read quarantined blob")?;
350
-
351
-
// Write to normal location
352
-
fs::write(&stored_path, &bytes)
353
-
.await
354
-
.context("Failed to write unquarantined blob")?;
355
-
356
-
// Update database metadata
357
-
let stored_path_str = stored_path.to_string_lossy().to_string();
358
-
let cid_str = cid.to_string();
359
-
let did_clone = self.did.clone();
360
-
361
-
self.db
362
-
.run(move |conn| {
363
-
diesel::update(blobs::table)
364
-
.filter(blobs::cid.eq(cid_str))
365
-
.filter(blobs::did.eq(did_clone))
366
-
.set((
367
-
blobs::path.eq(stored_path_str),
368
-
blobs::quarantined.eq(false),
369
-
))
370
-
.execute(conn)
371
-
.context("Failed to update blob metadata for unquarantine")
372
-
})
373
-
.await?;
374
-
375
-
// Remove the quarantined file
376
-
fs::remove_file(quarantined_path)
377
-
.await
378
-
.context("Failed to remove from quarantine")?;
379
-
}
215
+
// Update the quarantine flag in the database
216
+
self.db
217
+
.run(move |conn| {
218
+
diesel::update(blobs::table)
219
+
.filter(blobs::cid.eq(&cid_str))
220
+
.filter(blobs::did.eq(&did_clone))
221
+
.set(blobs::quarantined.eq(false))
222
+
.execute(conn)
223
+
.context("Failed to unquarantine blob")
224
+
})
225
+
.await?;
380
226
381
227
Ok(())
382
228
}
383
229
384
230
/// Get a blob as a stream
385
-
pub async fn get_object(&self, cid_param: Cid) -> Result<ByteStream> {
231
+
pub async fn get_object(&self, blob_cid: Cid) -> Result<ByteStream> {
386
232
use self::blobs::dsl::*;
387
233
388
-
// Get the blob path from the database
389
-
let cid_string = cid_param.to_string();
234
+
let cid_str = blob_cid.to_string();
390
235
let did_clone = self.did.clone();
391
236
392
-
let blob_record = self
237
+
// Get the blob data from the database
238
+
let blob_data = self
393
239
.db
394
240
.run(move |conn| {
395
241
blobs
396
-
.filter(cid.eq(&cid_string))
242
+
.filter(self::blobs::cid.eq(&cid_str))
397
243
.filter(did.eq(&did_clone))
398
244
.filter(quarantined.eq(false))
399
-
.select(path)
400
-
.first::<String>(conn)
245
+
.select(data)
246
+
.first::<Vec<u8>>(conn)
401
247
.optional()
402
-
.context("Failed to query blob metadata")
248
+
.context("Failed to query blob data")
403
249
})
404
250
.await?;
405
251
406
-
if let Some(blob_path) = blob_record {
407
-
// Read the blob data
408
-
let bytes = fs::read(blob_path)
409
-
.await
410
-
.context("Failed to read blob data")?;
252
+
if let Some(bytes) = blob_data {
411
253
Ok(ByteStream::new(bytes))
412
254
} else {
413
-
anyhow::bail!("Blob not found: {:?}", cid)
255
+
anyhow::bail!("Blob not found: {}", blob_cid)
414
256
}
415
257
}
416
258
···
426
268
}
427
269
428
270
/// Delete a blob by CID string
429
-
pub async fn delete(&self, cid: String) -> Result<()> {
430
-
self.delete_key(self.get_stored_path(&Cid::from_str(&cid)?))
431
-
.await
271
+
pub async fn delete(&self, blob_cid: String) -> Result<()> {
272
+
use self::blobs::dsl::*;
273
+
274
+
let did_clone = self.did.clone();
275
+
276
+
// Delete from database
277
+
self.db
278
+
.run(move |conn| {
279
+
diesel::delete(blobs)
280
+
.filter(self::blobs::cid.eq(&blob_cid))
281
+
.filter(did.eq(&did_clone))
282
+
.execute(conn)
283
+
.context("Failed to delete blob")
284
+
})
285
+
.await?;
286
+
287
+
Ok(())
432
288
}
433
289
434
290
/// Delete multiple blobs by CID
435
291
pub async fn delete_many(&self, cids: Vec<Cid>) -> Result<()> {
436
-
for cid in cids {
437
-
self.delete_key(self.get_stored_path(&cid)).await?;
438
-
}
292
+
use self::blobs::dsl::*;
293
+
294
+
let cid_strings: Vec<String> = cids.into_iter().map(|c| c.to_string()).collect();
295
+
let did_clone = self.did.clone();
296
+
297
+
// Delete all blobs in one operation
298
+
self.db
299
+
.run(move |conn| {
300
+
diesel::delete(blobs)
301
+
.filter(self::blobs::cid.eq_any(cid_strings))
302
+
.filter(did.eq(&did_clone))
303
+
.execute(conn)
304
+
.context("Failed to delete multiple blobs")
305
+
})
306
+
.await?;
307
+
439
308
Ok(())
440
309
}
441
310
442
311
/// Check if a blob is stored
443
-
pub async fn has_stored(&self, cid_param: Cid) -> Result<bool> {
312
+
pub async fn has_stored(&self, blob_cid: Cid) -> Result<bool> {
444
313
use self::blobs::dsl::*;
445
314
446
-
let cid_string = cid_param.to_string();
315
+
let cid_str = blob_cid.to_string();
447
316
let did_clone = self.did.clone();
448
317
449
318
let exists = self
···
451
320
.run(move |conn| {
452
321
diesel::select(diesel::dsl::exists(
453
322
blobs
454
-
.filter(cid.eq(&cid_string))
323
+
.filter(self::blobs::cid.eq(&cid_str))
455
324
.filter(did.eq(&did_clone))
456
325
.filter(temp.eq(false)),
457
326
))
···
465
334
466
335
/// Check if a temporary blob exists
467
336
pub async fn has_temp(&self, key: String) -> Result<bool> {
468
-
let tmp_path = self.get_tmp_path(&key);
469
-
Ok(tmp_path.exists())
470
-
}
337
+
use self::blobs::dsl::*;
471
338
472
-
/// Check if a blob exists by key
473
-
async fn has_key(&self, key_path: PathBuf) -> bool {
474
-
key_path.exists()
475
-
}
339
+
let did_clone = self.did.clone();
476
340
477
-
/// Delete a blob by its key path
478
-
async fn delete_key(&self, key_path: PathBuf) -> Result<()> {
479
-
use self::blobs::dsl::*;
480
-
481
-
// Delete from database first
482
-
let key_path_clone = key_path.clone();
483
-
self.db
341
+
let exists = self
342
+
.db
484
343
.run(move |conn| {
485
-
diesel::delete(blobs)
486
-
.filter(path.eq(key_path_clone.to_string_lossy().to_string()))
487
-
.execute(conn)
488
-
.context("Failed to delete blob metadata")
344
+
diesel::select(diesel::dsl::exists(
345
+
blobs
346
+
.filter(temp_key.eq(&key))
347
+
.filter(did.eq(&did_clone))
348
+
.filter(temp.eq(true)),
349
+
))
350
+
.get_result::<bool>(conn)
351
+
.context("Failed to check if temporary blob exists")
489
352
})
490
353
.await?;
491
354
492
-
// Then delete the file if it exists
493
-
if key_path.exists() {
494
-
fs::remove_file(key_path)
495
-
.await
496
-
.context("Failed to delete blob file")?;
497
-
}
498
-
499
-
Ok(())
500
-
}
501
-
502
-
/// Delete multiple blobs by key path
503
-
async fn delete_many_keys(&self, keys: Vec<String>) -> Result<()> {
504
-
for key in keys {
505
-
self.delete_key(PathBuf::from(key)).await?;
506
-
}
507
-
Ok(())
508
-
}
509
-
510
-
/// Move a blob from one location to another
511
-
async fn move_object(&self, keys: MoveObject) -> Result<()> {
512
-
let from_path = PathBuf::from(&keys.from);
513
-
let to_path = PathBuf::from(&keys.to);
514
-
515
-
// Ensure parent directory exists
516
-
if let Some(parent) = to_path.parent() {
517
-
fs::create_dir_all(parent)
518
-
.await
519
-
.context("Failed to create directory")?;
520
-
}
521
-
522
-
// Only move if the source exists
523
-
if from_path.exists() {
524
-
// Read the data
525
-
let data = fs::read(&from_path)
526
-
.await
527
-
.context("Failed to read source blob")?;
528
-
529
-
// Write to the destination
530
-
fs::write(&to_path, data)
531
-
.await
532
-
.context("Failed to write destination blob")?;
533
-
534
-
// Update the database record
535
-
let from_path_clone = from_path.clone();
536
-
self.db
537
-
.run(move |conn| {
538
-
diesel::update(blobs::table)
539
-
.filter(blobs::path.eq(from_path_clone.to_string_lossy().to_string()))
540
-
.set(blobs::path.eq(to_path.to_string_lossy().to_string()))
541
-
.execute(conn)
542
-
.context("Failed to update blob path")
543
-
})
544
-
.await?;
545
-
546
-
// Delete the source file
547
-
fs::remove_file(from_path)
548
-
.await
549
-
.context("Failed to remove source blob")?;
550
-
}
551
-
552
-
Ok(())
355
+
Ok(exists)
553
356
}
554
357
}