+81
-59
src/actor_store/blob.rs
+81
-59
src/actor_store/blob.rs
···
1
-
//! Blob storage and retrieval for the actor store.
2
-
//! Based on https://github.com/blacksky-algorithms/rsky/blob/main/rsky-pds/src/actor_store/blob/mod.rs
3
-
//! blacksky-algorithms/rsky is licensed under the Apache License 2.0
4
-
//!
5
-
//! Modified for SQLite backend
6
-
7
1
use std::sync::Arc;
8
2
9
-
use anyhow::{Error, Result, bail};
3
+
use anyhow::{Result, bail};
10
4
use cidv10::Cid;
11
5
use diesel::dsl::{count_distinct, exists, not};
12
6
use diesel::sql_types::{Integer, Nullable, Text};
13
7
use diesel::*;
14
-
use futures::stream::{self, StreamExt};
15
-
use futures::try_join;
16
-
use rsky_pds::actor_store::blob::sha256_stream;
8
+
use futures::{
9
+
stream::{self, StreamExt},
10
+
try_join,
11
+
};
17
12
// use rocket::data::{Data, ToByteUnit};
18
-
// use rocket::form::validate::Contains;
19
13
use rsky_common::ipld::sha256_raw_to_cid;
20
14
use rsky_common::now;
21
15
use rsky_lexicon::blob_refs::BlobRef;
22
16
use rsky_lexicon::com::atproto::admin::StatusAttr;
23
17
use rsky_lexicon::com::atproto::repo::ListMissingBlobsRefRecordBlob;
24
18
use rsky_pds::actor_store::blob::{
25
-
BlobMetadata, GetBlobMetadataOutput, GetBlobOutput, ListBlobsOpts, ListMissingBlobsOpts,
19
+
BlobMetadata, GetBlobMetadataOutput, ListBlobsOpts, ListMissingBlobsOpts, sha256_stream,
26
20
verify_blob,
27
21
};
28
22
use rsky_pds::image;
29
23
use rsky_pds::models::models;
30
24
use rsky_repo::error::BlobError;
31
25
use rsky_repo::types::{PreparedBlobRef, PreparedWrite};
32
-
use sha2::Digest;
33
26
34
-
use super::sql_blob::BlobStoreSql;
27
+
use super::sql_blob::{BlobStoreSql, ByteStream};
35
28
use crate::db::DbConn;
36
29
30
+
pub struct GetBlobOutput {
31
+
pub size: i32,
32
+
pub mime_type: Option<String>,
33
+
pub stream: ByteStream,
34
+
}
35
+
36
+
/// Handles blob operations for an actor store
37
37
pub struct BlobReader {
38
+
/// SQL-based blob storage
38
39
pub blobstore: BlobStoreSql,
40
+
/// DID of the actor
39
41
pub did: String,
42
+
/// Database connection
40
43
pub db: Arc<DbConn>,
41
44
}
42
45
43
-
// Basically handles getting blob records from db
44
46
impl BlobReader {
47
+
/// Create a new blob reader
45
48
pub fn new(blobstore: BlobStoreSql, db: Arc<DbConn>) -> Self {
46
-
// BlobReader {
47
-
// did: blobstore.bucket.clone(),
48
-
// blobstore,
49
-
// db,
50
-
// }
51
-
todo!();
49
+
BlobReader {
50
+
did: blobstore.did.clone(),
51
+
blobstore,
52
+
db,
53
+
}
52
54
}
53
55
56
+
/// Get metadata for a blob by CID
54
57
pub async fn get_blob_metadata(&self, cid: Cid) -> Result<GetBlobMetadataOutput> {
55
58
use rsky_pds::schema::pds::blob::dsl as BlobSchema;
56
59
···
77
80
}
78
81
}
79
82
83
+
/// Get a blob by CID with metadata and content
80
84
pub async fn get_blob(&self, cid: Cid) -> Result<GetBlobOutput> {
81
85
let metadata = self.get_blob_metadata(cid).await?;
82
-
// let blob_stream = match self.blobstore.get_stream(cid).await {
83
-
// Ok(res) => res,
84
-
// Err(e) => {
85
-
// return match e.downcast_ref() {
86
-
// Some(GetObjectError::NoSuchKey(key)) => {
87
-
// Err(anyhow::Error::new(GetObjectError::NoSuchKey(key.clone())))
88
-
// }
89
-
// _ => bail!(e.to_string()),
90
-
// };
91
-
// }
92
-
// };
93
-
// Ok(GetBlobOutput {
94
-
// size: metadata.size,
95
-
// mime_type: metadata.mime_type,
96
-
// stream: blob_stream,
97
-
// })
98
-
todo!();
86
+
let blob_stream = match self.blobstore.get_stream(cid).await {
87
+
Ok(stream) => stream,
88
+
Err(e) => bail!("Failed to get blob: {}", e),
89
+
};
90
+
91
+
Ok(GetBlobOutput {
92
+
size: metadata.size,
93
+
mime_type: metadata.mime_type,
94
+
stream: blob_stream,
95
+
})
99
96
}
100
97
98
+
/// Get all records that reference a specific blob
101
99
pub async fn get_records_for_blob(&self, cid: Cid) -> Result<Vec<String>> {
102
100
use rsky_pds::schema::pds::record_blob::dsl as RecordBlobSchema;
103
101
···
110
108
.filter(RecordBlobSchema::did.eq(did))
111
109
.select(models::RecordBlob::as_select())
112
110
.get_results(conn)?;
113
-
Ok::<_, Error>(results.into_iter().map(|row| row.record_uri))
111
+
Ok::<_, result::Error>(results.into_iter().map(|row| row.record_uri))
114
112
})
115
113
.await?
116
114
.collect::<Vec<String>>();
···
118
116
Ok(res)
119
117
}
120
118
119
+
/// Upload a blob and get its metadata
121
120
pub async fn upload_blob_and_get_metadata(
122
121
&self,
123
122
user_suggested_mime: String,
124
-
blob: Data<'_>, // Type representing the body data of a request.
123
+
blob: Vec<u8>,
125
124
) -> Result<BlobMetadata> {
126
-
todo!();
127
-
let blob_stream = blob.open(100.mebibytes());
128
-
let bytes = blob_stream.into_bytes().await?;
129
-
let size = bytes.n.written;
130
-
let bytes = bytes.into_inner();
125
+
let bytes = blob;
126
+
let size = bytes.len() as i64;
127
+
131
128
let (temp_key, sha256, img_info, sniffed_mime) = try_join!(
132
129
self.blobstore.put_temp(bytes.clone()),
133
130
sha256_stream(bytes.clone()),
···
140
137
141
138
Ok(BlobMetadata {
142
139
temp_key,
143
-
size: size as i64,
140
+
size,
144
141
cid,
145
142
mime_type,
146
143
width: if let Some(ref info) = img_info {
···
156
153
})
157
154
}
158
155
156
+
/// Track a blob that hasn't been associated with any records yet
159
157
pub async fn track_untethered_blob(&self, metadata: BlobMetadata) -> Result<BlobRef> {
160
158
use rsky_pds::schema::pds::blob::dsl as BlobSchema;
161
159
···
190
188
ON CONFLICT (cid, did) DO UPDATE \
191
189
SET \"tempKey\" = EXCLUDED.\"tempKey\" \
192
190
WHERE pds.blob.\"tempKey\" is not null;");
191
+
#[expect(trivial_casts)]
193
192
upsert
194
193
.bind::<Text, _>(&cid.to_string())
195
194
.bind::<Text, _>(&did)
···
206
205
}).await
207
206
}
208
207
208
+
/// Process blobs associated with writes
209
209
pub async fn process_write_blobs(&self, writes: Vec<PreparedWrite>) -> Result<()> {
210
210
self.delete_dereferenced_blobs(writes.clone()).await?;
211
+
211
212
let _ = stream::iter(writes)
212
213
.then(|write| async move {
213
214
Ok::<(), anyhow::Error>(match write {
···
230
231
.await
231
232
.into_iter()
232
233
.collect::<Result<Vec<_>, _>>()?;
234
+
233
235
Ok(())
234
236
}
235
237
238
+
/// Delete blobs that are no longer referenced by any records
236
239
pub async fn delete_dereferenced_blobs(&self, writes: Vec<PreparedWrite>) -> Result<()> {
237
240
use rsky_pds::schema::pds::blob::dsl as BlobSchema;
238
241
use rsky_pds::schema::pds::record_blob::dsl as RecordBlobSchema;
···
246
249
_ => None,
247
250
})
248
251
.collect();
252
+
249
253
if uris.is_empty() {
250
254
return Ok(());
251
255
}
···
260
264
.await?
261
265
.into_iter()
262
266
.collect::<Vec<models::RecordBlob>>();
267
+
263
268
if deleted_repo_blobs.is_empty() {
264
269
return Ok(());
265
270
}
···
293
298
.into_iter()
294
299
.flat_map(|v: Vec<PreparedBlobRef>| v.into_iter().map(|b| b.cid.to_string()))
295
300
.collect();
301
+
296
302
let mut cids_to_keep = Vec::new();
297
303
cids_to_keep.append(&mut new_blob_cids);
298
304
cids_to_keep.append(&mut duplicated_cids);
···
300
306
let cids_to_delete = deleted_repo_blob_cids
301
307
.into_iter()
302
308
.filter_map(|cid: String| match cids_to_keep.contains(&cid) {
303
-
true => Some(cid),
304
-
false => None,
309
+
true => None,
310
+
false => Some(cid),
305
311
})
306
312
.collect::<Vec<String>>();
313
+
307
314
if cids_to_delete.is_empty() {
308
315
return Ok(());
309
316
}
···
317
324
})
318
325
.await?;
319
326
320
-
// Original code queues a background job to delete by CID from S3 compatible blobstore
327
+
// Delete from blob storage
321
328
let _ = stream::iter(cids_to_delete)
322
329
.then(|cid| async { self.blobstore.delete(cid).await })
323
330
.collect::<Vec<_>>()
324
331
.await
325
332
.into_iter()
326
333
.collect::<Result<Vec<_>, _>>()?;
334
+
327
335
Ok(())
328
336
}
329
337
338
+
/// Verify a blob and make it permanent
330
339
pub async fn verify_blob_and_make_permanent(&self, blob: PreparedBlobRef) -> Result<()> {
331
340
use rsky_pds::schema::pds::blob::dsl as BlobSchema;
332
341
···
344
353
.optional()
345
354
})
346
355
.await?;
356
+
347
357
if let Some(found) = found {
348
358
verify_blob(&blob, &found).await?;
349
359
if let Some(ref temp_key) = found.temp_key {
···
361
371
.await?;
362
372
Ok(())
363
373
} else {
364
-
bail!("Cound not find blob: {:?}", blob.cid.to_string())
374
+
bail!("Could not find blob: {:?}", blob.cid.to_string())
365
375
}
366
376
}
367
377
368
-
pub async fn associate_blob(&self, blob: PreparedBlobRef, _record_uri: String) -> Result<()> {
378
+
/// Associate a blob with a record
379
+
pub async fn associate_blob(&self, blob: PreparedBlobRef, record_uri: String) -> Result<()> {
369
380
use rsky_pds::schema::pds::record_blob::dsl as RecordBlobSchema;
370
381
371
382
let cid = blob.cid.to_string();
372
-
let record_uri = _record_uri;
373
383
let did = self.did.clone();
384
+
374
385
self.db
375
386
.run(move |conn| {
376
387
insert_into(RecordBlobSchema::record_blob)
···
383
394
.execute(conn)
384
395
})
385
396
.await?;
397
+
386
398
Ok(())
387
399
}
388
400
401
+
/// Count all blobs for this actor
389
402
pub async fn blob_count(&self) -> Result<i64> {
390
403
use rsky_pds::schema::pds::blob::dsl as BlobSchema;
391
404
···
401
414
.await
402
415
}
403
416
417
+
/// Count blobs associated with records
404
418
pub async fn record_blob_count(&self) -> Result<i64> {
405
419
use rsky_pds::schema::pds::record_blob::dsl as RecordBlobSchema;
406
420
···
416
430
.await
417
431
}
418
432
433
+
/// List blobs that are referenced but missing
419
434
pub async fn list_missing_blobs(
420
435
&self,
421
436
opts: ListMissingBlobsOpts,
···
474
489
.await
475
490
}
476
491
492
+
/// List all blobs with optional filtering
477
493
pub async fn list_blobs(&self, opts: ListBlobsOpts) -> Result<Vec<String>> {
478
494
use rsky_pds::schema::pds::record::dsl as RecordSchema;
479
495
use rsky_pds::schema::pds::record_blob::dsl as RecordBlobSchema;
496
+
480
497
let ListBlobsOpts {
481
498
since,
482
499
cursor,
···
512
529
}
513
530
self.db.run(move |conn| builder.load(conn)).await?
514
531
};
532
+
515
533
Ok(res)
516
534
}
517
535
536
+
/// Get the takedown status of a blob
518
537
pub async fn get_blob_takedown_status(&self, cid: Cid) -> Result<Option<StatusAttr>> {
519
538
use rsky_pds::schema::pds::blob::dsl as BlobSchema;
520
539
···
525
544
.select(models::Blob::as_select())
526
545
.first(conn)
527
546
.optional()?;
547
+
528
548
match res {
529
549
None => Ok(None),
530
550
Some(res) => match res.takedown_ref {
···
542
562
.await
543
563
}
544
564
545
-
// Transactors
546
-
// -------------------
547
-
565
+
/// Update the takedown status of a blob
548
566
pub async fn update_blob_takedown_status(&self, blob: Cid, takedown: StatusAttr) -> Result<()> {
549
567
use rsky_pds::schema::pds::blob::dsl as BlobSchema;
550
568
···
556
574
false => None,
557
575
};
558
576
559
-
let blob = self
560
-
.db
577
+
let blob_cid = blob.to_string();
578
+
let did_clone = self.did.clone();
579
+
580
+
self.db
561
581
.run(move |conn| {
562
582
update(BlobSchema::blob)
563
-
.filter(BlobSchema::cid.eq(blob.to_string()))
583
+
.filter(BlobSchema::cid.eq(blob_cid))
584
+
.filter(BlobSchema::did.eq(did_clone))
564
585
.set(BlobSchema::takedownRef.eq(takedown_ref))
565
586
.execute(conn)?;
566
-
Ok::<_, Error>(blob)
587
+
Ok::<_, result::Error>(blob)
567
588
})
568
589
.await?;
569
590
···
571
592
true => self.blobstore.quarantine(blob).await,
572
593
false => self.blobstore.unquarantine(blob).await,
573
594
};
595
+
574
596
match res {
575
597
Ok(_) => Ok(()),
576
598
Err(e) => match e.downcast_ref() {
+472
-165
src/actor_store/sql_blob.rs
+472
-165
src/actor_store/sql_blob.rs
···
1
+
#![expect(
2
+
clippy::pub_use,
3
+
clippy::single_char_lifetime_names,
4
+
unused_qualifications
5
+
)]
1
6
use std::{path::PathBuf, str::FromStr as _, sync::Arc};
2
7
3
-
use anyhow::Result;
8
+
use anyhow::{Context, Result};
4
9
use cidv10::Cid;
10
+
use diesel::*;
5
11
use rsky_common::get_random_str;
12
+
use tokio::fs;
6
13
7
14
use crate::db::DbConn;
8
15
9
16
/// Type for stream of blob data
10
17
pub type BlobStream = Box<dyn std::io::Read + Send>;
11
18
12
-
/// Placeholder implementation for blob store
19
+
/// ByteStream implementation for blob data
20
+
pub struct ByteStream {
21
+
pub bytes: Vec<u8>,
22
+
}
23
+
24
+
impl ByteStream {
25
+
pub fn new(bytes: Vec<u8>) -> Self {
26
+
Self { bytes }
27
+
}
28
+
29
+
pub async fn collect(self) -> Result<Vec<u8>> {
30
+
Ok(self.bytes)
31
+
}
32
+
}
33
+
34
+
/// SQL-based implementation of blob storage
13
35
#[derive(Clone)]
14
-
pub(crate) struct BlobStoreSql {
15
-
client: Arc<DbConn>,
16
-
path: PathBuf,
36
+
pub struct BlobStoreSql {
37
+
/// Database connection for metadata
38
+
pub db: Arc<DbConn>,
39
+
/// DID of the actor
40
+
pub did: String,
41
+
/// Path for blob storage
42
+
pub path: PathBuf,
17
43
}
18
44
19
45
/// Configuration for the blob store
20
-
/// TODO: Implement this placeholder
21
-
pub(crate) struct BlobConfig {
22
-
pub(crate) path: PathBuf,
46
+
pub struct BlobConfig {
47
+
/// Base path for blob storage
48
+
pub path: PathBuf,
23
49
}
24
50
25
-
/// ByteStream
26
-
/// TODO: Implement this placeholder
27
-
pub(crate) struct ByteStream {
28
-
pub(crate) bytes: Vec<u8>,
51
+
/// Represents a move operation for blobs
52
+
struct MoveObject {
53
+
from: String,
54
+
to: String,
29
55
}
30
-
impl ByteStream {
31
-
pub async fn collect(self) -> Result<Vec<u8>> {
32
-
Ok(self.bytes)
56
+
57
+
/// Blob table structure for SQL operations
58
+
#[derive(Queryable, Insertable, Debug)]
59
+
#[diesel(table_name = blobs)]
60
+
struct BlobEntry {
61
+
cid: String,
62
+
did: String,
63
+
path: String,
64
+
size: i32,
65
+
mime_type: String,
66
+
quarantined: bool,
67
+
temp: bool,
68
+
}
69
+
70
+
// Table definition for blobs
71
+
table! {
72
+
blobs (cid, did) {
73
+
cid -> Text,
74
+
did -> Text,
75
+
path -> Text,
76
+
size -> Integer,
77
+
mime_type -> Text,
78
+
quarantined -> Bool,
79
+
temp -> Bool,
33
80
}
34
81
}
35
82
36
83
impl BlobStoreSql {
37
-
pub fn new(did: String, cfg: &BlobConfig) -> Self {
38
-
// let client = aws_sdk_s3::Client::new(cfg);
39
-
// BlobStorePlaceholder {
40
-
// client,
41
-
// bucket: did,
42
-
// }
43
-
todo!();
84
+
/// Create a new SQL-based blob store for the given DID
85
+
pub fn new(did: String, cfg: &BlobConfig, db: Arc<DbConn>) -> Self {
86
+
let actor_path = cfg.path.join(&did);
87
+
88
+
// Create actor directory if it doesn't exist
89
+
if !actor_path.exists() {
90
+
// Use blocking to avoid complicating this constructor
91
+
std::fs::create_dir_all(&actor_path).unwrap_or_else(|_| {
92
+
panic!("Failed to create blob directory: {}", actor_path.display())
93
+
});
94
+
}
95
+
96
+
BlobStoreSql {
97
+
db,
98
+
did,
99
+
path: actor_path,
100
+
}
44
101
}
45
102
46
-
pub fn creator(cfg: &BlobConfig) -> Box<dyn Fn(String) -> BlobStoreSql + '_> {
47
-
Box::new(move |did: String| BlobStoreSql::new(did, cfg))
103
+
/// Create a factory function for blob stores
104
+
pub fn creator(cfg: &BlobConfig, db: Arc<DbConn>) -> Box<dyn Fn(String) -> BlobStoreSql + '_> {
105
+
let db_clone = db.clone();
106
+
Box::new(move |did: String| BlobStoreSql::new(did, cfg, db_clone.clone()))
48
107
}
49
108
109
+
/// Generate a random key for temporary blobs
50
110
fn gen_key(&self) -> String {
51
111
get_random_str()
52
112
}
53
113
54
-
fn get_tmp_path(&self, key: &String) -> String {
55
-
// format!("tmp/{0}/{1}", self.bucket, key)
56
-
todo!();
114
+
/// Get the path for a temporary blob
115
+
fn get_tmp_path(&self, key: &str) -> PathBuf {
116
+
self.path.join("tmp").join(key)
57
117
}
58
118
59
-
fn get_stored_path(&self, cid: Cid) -> String {
60
-
// format!("blocks/{0}/{1}", self.bucket, cid)
61
-
todo!();
119
+
/// Get the filesystem path for a stored blob
120
+
fn get_stored_path(&self, cid: &Cid) -> PathBuf {
121
+
self.path.join("blocks").join(cid.to_string())
62
122
}
63
123
64
-
fn get_quarantined_path(&self, cid: Cid) -> String {
65
-
// format!("quarantine/{0}/{1}", self.bucket, cid)
66
-
todo!();
124
+
/// Get the filesystem path for a quarantined blob
125
+
fn get_quarantined_path(&self, cid: &Cid) -> PathBuf {
126
+
self.path.join("quarantine").join(cid.to_string())
67
127
}
68
128
129
+
/// Store a blob temporarily
69
130
pub async fn put_temp(&self, bytes: Vec<u8>) -> Result<String> {
70
131
let key = self.gen_key();
71
-
// let body = ByteStream::from(bytes);
72
-
// self.client
73
-
// .put_object()
74
-
// .body(body)
75
-
// .bucket(&self.bucket)
76
-
// .key(self.get_tmp_path(&key))
77
-
// .acl(ObjectCannedAcl::PublicRead)
78
-
// .send()
79
-
// .await?;
80
-
// Ok(key)
81
-
todo!();
132
+
let tmp_path = self.get_tmp_path(&key);
133
+
134
+
// Ensure the directory exists
135
+
if let Some(parent) = tmp_path.parent() {
136
+
fs::create_dir_all(parent)
137
+
.await
138
+
.context("Failed to create temp directory")?;
139
+
}
140
+
141
+
// Write the blob data to the file
142
+
fs::write(&tmp_path, &bytes)
143
+
.await
144
+
.context("Failed to write temporary blob")?;
145
+
146
+
// Clone values to be used in the closure
147
+
let did_clone = self.did.clone();
148
+
let tmp_path_str = tmp_path.to_string_lossy().to_string();
149
+
let bytes_len = bytes.len() as i32;
150
+
151
+
// Store metadata in the database (will be updated when made permanent)
152
+
self.db
153
+
.run(move |conn| {
154
+
let entry = BlobEntry {
155
+
cid: "temp".to_string(), // Will be updated when made permanent
156
+
did: did_clone,
157
+
path: tmp_path_str,
158
+
size: bytes_len,
159
+
mime_type: "application/octet-stream".to_string(), // Will be updated when made permanent
160
+
quarantined: false,
161
+
temp: true,
162
+
};
163
+
164
+
diesel::insert_into(blobs::table)
165
+
.values(&entry)
166
+
.execute(conn)
167
+
.context("Failed to insert temporary blob metadata")
168
+
})
169
+
.await?;
170
+
171
+
Ok(key)
82
172
}
83
173
174
+
/// Make a temporary blob permanent
84
175
pub async fn make_permanent(&self, key: String, cid: Cid) -> Result<()> {
85
-
// let already_has = self.has_stored(cid).await?;
86
-
// if !already_has {
87
-
// Ok(self
88
-
// .move_object(MoveObject {
89
-
// from: self.get_tmp_path(&key),
90
-
// to: self.get_stored_path(cid),
91
-
// })
92
-
// .await?)
93
-
// } else {
94
-
// // already saved, so we no-op & just delete the temp
95
-
// Ok(self.delete_key(self.get_tmp_path(&key)).await?)
96
-
// }
97
-
todo!();
176
+
let already_has = self.has_stored(cid).await?;
177
+
if !already_has {
178
+
let tmp_path = self.get_tmp_path(&key);
179
+
let stored_path = self.get_stored_path(&cid);
180
+
181
+
// Ensure parent directory exists
182
+
if let Some(parent) = stored_path.parent() {
183
+
fs::create_dir_all(parent)
184
+
.await
185
+
.context("Failed to create blocks directory")?;
186
+
}
187
+
188
+
// Read the bytes
189
+
let bytes = fs::read(&tmp_path)
190
+
.await
191
+
.context("Failed to read temporary blob")?;
192
+
193
+
// Write to permanent location
194
+
fs::write(&stored_path, &bytes)
195
+
.await
196
+
.context("Failed to write permanent blob")?;
197
+
198
+
// Update database metadata
199
+
let tmp_path_clone = tmp_path.clone();
200
+
self.db
201
+
.run(move |conn| {
202
+
// Update the entry with the correct CID and path
203
+
diesel::update(blobs::table)
204
+
.filter(blobs::path.eq(tmp_path_clone.to_string_lossy().to_string()))
205
+
.set((
206
+
blobs::cid.eq(cid.to_string()),
207
+
blobs::path.eq(stored_path.to_string_lossy().to_string()),
208
+
blobs::temp.eq(false),
209
+
))
210
+
.execute(conn)
211
+
.context("Failed to update blob metadata")
212
+
})
213
+
.await?;
214
+
215
+
// Remove the temporary file
216
+
fs::remove_file(tmp_path)
217
+
.await
218
+
.context("Failed to remove temporary blob")?;
219
+
220
+
Ok(())
221
+
} else {
222
+
// Already saved, so delete the temp file
223
+
let tmp_path = self.get_tmp_path(&key);
224
+
if tmp_path.exists() {
225
+
fs::remove_file(tmp_path)
226
+
.await
227
+
.context("Failed to remove existing temporary blob")?;
228
+
}
229
+
Ok(())
230
+
}
98
231
}
99
232
233
+
/// Store a blob directly as permanent
100
234
pub async fn put_permanent(&self, cid: Cid, bytes: Vec<u8>) -> Result<()> {
101
-
// let body = ByteStream::from(bytes);
102
-
// self.client
103
-
// .put_object()
104
-
// .body(body)
105
-
// .bucket(&self.bucket)
106
-
// .key(self.get_stored_path(cid))
107
-
// .acl(ObjectCannedAcl::PublicRead)
108
-
// .send()
109
-
// .await?;
110
-
// Ok(())
111
-
todo!();
235
+
let stored_path = self.get_stored_path(&cid);
236
+
237
+
// Ensure parent directory exists
238
+
if let Some(parent) = stored_path.parent() {
239
+
fs::create_dir_all(parent)
240
+
.await
241
+
.context("Failed to create blocks directory")?;
242
+
}
243
+
244
+
// Write to permanent location
245
+
fs::write(&stored_path, &bytes)
246
+
.await
247
+
.context("Failed to write permanent blob")?;
248
+
249
+
let stored_path_str = stored_path.to_string_lossy().to_string();
250
+
let cid_str = cid.to_string();
251
+
let did_clone = self.did.clone();
252
+
let bytes_len = bytes.len() as i32;
253
+
254
+
// Update database metadata
255
+
self.db
256
+
.run(move |conn| {
257
+
let entry = BlobEntry {
258
+
cid: cid_str,
259
+
did: did_clone,
260
+
path: stored_path_str.clone(),
261
+
size: bytes_len,
262
+
mime_type: "application/octet-stream".to_string(), // Could be improved with MIME detection
263
+
quarantined: false,
264
+
temp: false,
265
+
};
266
+
267
+
diesel::insert_into(blobs::table)
268
+
.values(&entry)
269
+
.on_conflict((blobs::cid, blobs::did))
270
+
.do_update()
271
+
.set(blobs::path.eq(stored_path_str))
272
+
.execute(conn)
273
+
.context("Failed to insert permanent blob metadata")
274
+
})
275
+
.await?;
276
+
277
+
Ok(())
112
278
}
113
279
280
+
/// Quarantine a blob
114
281
pub async fn quarantine(&self, cid: Cid) -> Result<()> {
115
-
// self.move_object(MoveObject {
116
-
// from: self.get_stored_path(cid),
117
-
// to: self.get_quarantined_path(cid),
118
-
// })
119
-
// .await
120
-
todo!();
282
+
let stored_path = self.get_stored_path(&cid);
283
+
let quarantined_path = self.get_quarantined_path(&cid);
284
+
285
+
// Ensure parent directory exists
286
+
if let Some(parent) = quarantined_path.parent() {
287
+
fs::create_dir_all(parent)
288
+
.await
289
+
.context("Failed to create quarantine directory")?;
290
+
}
291
+
292
+
// Move the blob if it exists
293
+
if stored_path.exists() {
294
+
// Read the bytes
295
+
let bytes = fs::read(&stored_path)
296
+
.await
297
+
.context("Failed to read stored blob")?;
298
+
299
+
// Write to quarantine location
300
+
fs::write(&quarantined_path, &bytes)
301
+
.await
302
+
.context("Failed to write quarantined blob")?;
303
+
304
+
// Update database metadata
305
+
let cid_str = cid.to_string();
306
+
let did_clone = self.did.clone();
307
+
let quarantined_path_str = quarantined_path.to_string_lossy().to_string();
308
+
309
+
self.db
310
+
.run(move |conn| {
311
+
diesel::update(blobs::table)
312
+
.filter(blobs::cid.eq(cid_str))
313
+
.filter(blobs::did.eq(did_clone))
314
+
.set((
315
+
blobs::path.eq(quarantined_path_str),
316
+
blobs::quarantined.eq(true),
317
+
))
318
+
.execute(conn)
319
+
.context("Failed to update blob metadata for quarantine")
320
+
})
321
+
.await?;
322
+
323
+
// Remove the original file
324
+
fs::remove_file(stored_path)
325
+
.await
326
+
.context("Failed to remove quarantined blob")?;
327
+
}
328
+
329
+
Ok(())
121
330
}
122
331
332
+
/// Unquarantine a blob
123
333
pub async fn unquarantine(&self, cid: Cid) -> Result<()> {
124
-
// self.move_object(MoveObject {
125
-
// from: self.get_quarantined_path(cid),
126
-
// to: self.get_stored_path(cid),
127
-
// })
128
-
// .await
129
-
todo!();
334
+
let quarantined_path = self.get_quarantined_path(&cid);
335
+
let stored_path = self.get_stored_path(&cid);
336
+
337
+
// Ensure parent directory exists
338
+
if let Some(parent) = stored_path.parent() {
339
+
fs::create_dir_all(parent)
340
+
.await
341
+
.context("Failed to create blocks directory")?;
342
+
}
343
+
344
+
// Move the blob if it exists
345
+
if quarantined_path.exists() {
346
+
// Read the bytes
347
+
let bytes = fs::read(&quarantined_path)
348
+
.await
349
+
.context("Failed to read quarantined blob")?;
350
+
351
+
// Write to normal location
352
+
fs::write(&stored_path, &bytes)
353
+
.await
354
+
.context("Failed to write unquarantined blob")?;
355
+
356
+
// Update database metadata
357
+
let stored_path_str = stored_path.to_string_lossy().to_string();
358
+
let cid_str = cid.to_string();
359
+
let did_clone = self.did.clone();
360
+
361
+
self.db
362
+
.run(move |conn| {
363
+
diesel::update(blobs::table)
364
+
.filter(blobs::cid.eq(cid_str))
365
+
.filter(blobs::did.eq(did_clone))
366
+
.set((
367
+
blobs::path.eq(stored_path_str),
368
+
blobs::quarantined.eq(false),
369
+
))
370
+
.execute(conn)
371
+
.context("Failed to update blob metadata for unquarantine")
372
+
})
373
+
.await?;
374
+
375
+
// Remove the quarantined file
376
+
fs::remove_file(quarantined_path)
377
+
.await
378
+
.context("Failed to remove from quarantine")?;
379
+
}
380
+
381
+
Ok(())
130
382
}
131
383
132
-
async fn get_object(&self, cid: Cid) -> Result<ByteStream> {
133
-
// let res = self
134
-
// .client
135
-
// .get_object()
136
-
// .bucket(&self.bucket)
137
-
// .key(self.get_stored_path(cid))
138
-
// .send()
139
-
// .await;
140
-
// match res {
141
-
// Ok(res) => Ok(res.body),
142
-
// Err(SdkError::ServiceError(s)) => Err(anyhow::Error::new(s.into_err())),
143
-
// Err(e) => Err(anyhow::Error::new(e.into_service_error())),
144
-
// }
145
-
todo!();
384
+
/// Get a blob as a stream
385
+
pub async fn get_object(&self, cid_param: Cid) -> Result<ByteStream> {
386
+
use self::blobs::dsl::*;
387
+
388
+
// Get the blob path from the database
389
+
let cid_string = cid_param.to_string();
390
+
let did_clone = self.did.clone();
391
+
392
+
let blob_record = self
393
+
.db
394
+
.run(move |conn| {
395
+
blobs
396
+
.filter(cid.eq(&cid_string))
397
+
.filter(did.eq(&did_clone))
398
+
.filter(quarantined.eq(false))
399
+
.select(path)
400
+
.first::<String>(conn)
401
+
.optional()
402
+
.context("Failed to query blob metadata")
403
+
})
404
+
.await?;
405
+
406
+
if let Some(blob_path) = blob_record {
407
+
// Read the blob data
408
+
let bytes = fs::read(blob_path)
409
+
.await
410
+
.context("Failed to read blob data")?;
411
+
Ok(ByteStream::new(bytes))
412
+
} else {
413
+
anyhow::bail!("Blob not found: {:?}", cid)
414
+
}
146
415
}
147
416
417
+
/// Get blob bytes
148
418
pub async fn get_bytes(&self, cid: Cid) -> Result<Vec<u8>> {
149
-
let res = self.get_object(cid).await?;
150
-
// let bytes = res.collect().await.map(|data| data.into_bytes())?;
151
-
// Ok(bytes.to_vec())
152
-
todo!();
419
+
let stream = self.get_object(cid).await?;
420
+
stream.collect().await
153
421
}
154
422
423
+
/// Get a blob as a stream
155
424
pub async fn get_stream(&self, cid: Cid) -> Result<ByteStream> {
156
425
self.get_object(cid).await
157
426
}
158
427
428
+
/// Delete a blob by CID string
159
429
pub async fn delete(&self, cid: String) -> Result<()> {
160
-
self.delete_key(self.get_stored_path(Cid::from_str(&cid)?))
430
+
self.delete_key(self.get_stored_path(&Cid::from_str(&cid)?))
161
431
.await
162
432
}
163
433
434
+
/// Delete multiple blobs by CID
164
435
pub async fn delete_many(&self, cids: Vec<Cid>) -> Result<()> {
165
-
let keys: Vec<String> = cids
166
-
.into_iter()
167
-
.map(|cid| self.get_stored_path(cid))
168
-
.collect();
169
-
self.delete_many_keys(keys).await
436
+
for cid in cids {
437
+
self.delete_key(self.get_stored_path(&cid)).await?;
438
+
}
439
+
Ok(())
170
440
}
171
441
172
-
pub async fn has_stored(&self, cid: Cid) -> Result<bool> {
173
-
Ok(self.has_key(self.get_stored_path(cid)).await)
442
+
/// Check if a blob is stored
443
+
pub async fn has_stored(&self, cid_param: Cid) -> Result<bool> {
444
+
use self::blobs::dsl::*;
445
+
446
+
let cid_string = cid_param.to_string();
447
+
let did_clone = self.did.clone();
448
+
449
+
let exists = self
450
+
.db
451
+
.run(move |conn| {
452
+
diesel::select(diesel::dsl::exists(
453
+
blobs
454
+
.filter(cid.eq(&cid_string))
455
+
.filter(did.eq(&did_clone))
456
+
.filter(temp.eq(false)),
457
+
))
458
+
.get_result::<bool>(conn)
459
+
.context("Failed to check if blob exists")
460
+
})
461
+
.await?;
462
+
463
+
Ok(exists)
174
464
}
175
465
466
+
/// Check if a temporary blob exists
176
467
pub async fn has_temp(&self, key: String) -> Result<bool> {
177
-
Ok(self.has_key(self.get_tmp_path(&key)).await)
468
+
let tmp_path = self.get_tmp_path(&key);
469
+
Ok(tmp_path.exists())
178
470
}
179
471
180
-
async fn has_key(&self, key: String) -> bool {
181
-
// let res = self
182
-
// .client
183
-
// .head_object()
184
-
// .bucket(&self.bucket)
185
-
// .key(key)
186
-
// .send()
187
-
// .await;
188
-
// res.is_ok()
189
-
todo!();
472
+
/// Check if a blob exists by key
473
+
async fn has_key(&self, key_path: PathBuf) -> bool {
474
+
key_path.exists()
190
475
}
191
476
192
-
async fn delete_key(&self, key: String) -> Result<()> {
193
-
// self.client
194
-
// .delete_object()
195
-
// .bucket(&self.bucket)
196
-
// .key(key)
197
-
// .send()
198
-
// .await?;
199
-
// Ok(())
200
-
todo!();
477
+
/// Delete a blob by its key path
478
+
async fn delete_key(&self, key_path: PathBuf) -> Result<()> {
479
+
use self::blobs::dsl::*;
480
+
481
+
// Delete from database first
482
+
let key_path_clone = key_path.clone();
483
+
self.db
484
+
.run(move |conn| {
485
+
diesel::delete(blobs)
486
+
.filter(path.eq(key_path_clone.to_string_lossy().to_string()))
487
+
.execute(conn)
488
+
.context("Failed to delete blob metadata")
489
+
})
490
+
.await?;
491
+
492
+
// Then delete the file if it exists
493
+
if key_path.exists() {
494
+
fs::remove_file(key_path)
495
+
.await
496
+
.context("Failed to delete blob file")?;
497
+
}
498
+
499
+
Ok(())
201
500
}
202
501
502
+
/// Delete multiple blobs by key path
203
503
async fn delete_many_keys(&self, keys: Vec<String>) -> Result<()> {
204
-
// let objects: Vec<ObjectIdentifier> = keys
205
-
// .into_iter()
206
-
// .map(|key| Ok(ObjectIdentifier::builder().key(key).build()?))
207
-
// .collect::<Result<Vec<ObjectIdentifier>>>()?;
208
-
// let deletes = Delete::builder().set_objects(Some(objects)).build()?;
209
-
// self.client
210
-
// .delete_objects()
211
-
// .bucket(&self.bucket)
212
-
// .delete(deletes)
213
-
// .send()
214
-
// .await?;
215
-
// Ok(())
216
-
todo!();
504
+
for key in keys {
505
+
self.delete_key(PathBuf::from(key)).await?;
506
+
}
507
+
Ok(())
217
508
}
218
509
510
+
/// Move a blob from one location to another
219
511
async fn move_object(&self, keys: MoveObject) -> Result<()> {
220
-
// self.client
221
-
// .copy_object()
222
-
// .bucket(&self.bucket)
223
-
// .copy_source(format!(
224
-
// "{0}/{1}/{2}",
225
-
// env_str("AWS_ENDPOINT_BUCKET").unwrap(),
226
-
// self.bucket,
227
-
// keys.from
228
-
// ))
229
-
// .key(keys.to)
230
-
// .acl(ObjectCannedAcl::PublicRead)
231
-
// .send()
232
-
// .await?;
233
-
// self.client
234
-
// .delete_object()
235
-
// .bucket(&self.bucket)
236
-
// .key(keys.from)
237
-
// .send()
238
-
// .await?;
239
-
// Ok(())
240
-
todo!();
512
+
let from_path = PathBuf::from(&keys.from);
513
+
let to_path = PathBuf::from(&keys.to);
514
+
515
+
// Ensure parent directory exists
516
+
if let Some(parent) = to_path.parent() {
517
+
fs::create_dir_all(parent)
518
+
.await
519
+
.context("Failed to create directory")?;
520
+
}
521
+
522
+
// Only move if the source exists
523
+
if from_path.exists() {
524
+
// Read the data
525
+
let data = fs::read(&from_path)
526
+
.await
527
+
.context("Failed to read source blob")?;
528
+
529
+
// Write to the destination
530
+
fs::write(&to_path, data)
531
+
.await
532
+
.context("Failed to write destination blob")?;
533
+
534
+
// Update the database record
535
+
let from_path_clone = from_path.clone();
536
+
self.db
537
+
.run(move |conn| {
538
+
diesel::update(blobs::table)
539
+
.filter(blobs::path.eq(from_path_clone.to_string_lossy().to_string()))
540
+
.set(blobs::path.eq(to_path.to_string_lossy().to_string()))
541
+
.execute(conn)
542
+
.context("Failed to update blob path")
543
+
})
544
+
.await?;
545
+
546
+
// Delete the source file
547
+
fs::remove_file(from_path)
548
+
.await
549
+
.context("Failed to remove source blob")?;
550
+
}
551
+
552
+
Ok(())
241
553
}
242
554
}
243
-
244
-
struct MoveObject {
245
-
from: String,
246
-
to: String,
247
-
}
+25
-23
src/actor_store/sql_repo.rs
+25
-23
src/actor_store/sql_repo.rs
···
38
38
}
39
39
40
40
impl ReadableBlockstore for SqlRepoReader {
41
-
fn get_bytes<'a>(
42
-
&'a self,
43
-
cid: &'a Cid,
44
-
) -> Pin<Box<dyn Future<Output = Result<Option<Vec<u8>>>> + Send + Sync + 'a>> {
41
+
fn get_bytes<'life>(
42
+
&'life self,
43
+
cid: &'life Cid,
44
+
) -> Pin<Box<dyn Future<Output = Result<Option<Vec<u8>>>> + Send + Sync + 'life>> {
45
45
let did: String = self.did.clone();
46
46
let db: Arc<DbConn> = self.db.clone();
47
47
let cid = cid.clone();
···
79
79
})
80
80
}
81
81
82
-
fn has<'a>(
83
-
&'a self,
82
+
fn has<'life>(
83
+
&'life self,
84
84
cid: Cid,
85
-
) -> Pin<Box<dyn Future<Output = Result<bool>> + Send + Sync + 'a>> {
85
+
) -> Pin<Box<dyn Future<Output = Result<bool>> + Send + Sync + 'life>> {
86
86
Box::pin(async move {
87
87
let got = <Self as ReadableBlockstore>::get_bytes(self, &cid).await?;
88
88
Ok(got.is_some())
89
89
})
90
90
}
91
91
92
-
fn get_blocks<'a>(
93
-
&'a self,
92
+
fn get_blocks<'life>(
93
+
&'life self,
94
94
cids: Vec<Cid>,
95
-
) -> Pin<Box<dyn Future<Output = Result<BlocksAndMissing>> + Send + Sync + 'a>> {
95
+
) -> Pin<Box<dyn Future<Output = Result<BlocksAndMissing>> + Send + Sync + 'life>> {
96
96
let did: String = self.did.clone();
97
97
let db: Arc<DbConn> = self.db.clone();
98
98
···
173
173
}
174
174
175
175
impl RepoStorage for SqlRepoReader {
176
-
fn get_root<'a>(&'a self) -> Pin<Box<dyn Future<Output = Option<Cid>> + Send + Sync + 'a>> {
176
+
fn get_root<'life>(
177
+
&'life self,
178
+
) -> Pin<Box<dyn Future<Output = Option<Cid>> + Send + Sync + 'life>> {
177
179
Box::pin(async move {
178
180
match self.get_root_detailed().await {
179
181
Ok(root) => Some(root.cid),
···
182
184
})
183
185
}
184
186
185
-
fn put_block<'a>(
186
-
&'a self,
187
+
fn put_block<'life>(
188
+
&'life self,
187
189
cid: Cid,
188
190
bytes: Vec<u8>,
189
191
rev: String,
190
-
) -> Pin<Box<dyn Future<Output = Result<()>> + Send + Sync + 'a>> {
192
+
) -> Pin<Box<dyn Future<Output = Result<()>> + Send + Sync + 'life>> {
191
193
let did: String = self.did.clone();
192
194
let db: Arc<DbConn> = self.db.clone();
193
195
let bytes_cloned = bytes.clone();
···
214
216
})
215
217
}
216
218
217
-
fn put_many<'a>(
218
-
&'a self,
219
+
fn put_many<'life>(
220
+
&'life self,
219
221
to_put: BlockMap,
220
222
rev: String,
221
-
) -> Pin<Box<dyn Future<Output = Result<()>> + Send + Sync + 'a>> {
223
+
) -> Pin<Box<dyn Future<Output = Result<()>> + Send + Sync + 'life>> {
222
224
let did: String = self.did.clone();
223
225
let db: Arc<DbConn> = self.db.clone();
224
226
···
263
265
Ok(())
264
266
})
265
267
}
266
-
fn update_root<'a>(
267
-
&'a self,
268
+
fn update_root<'life>(
269
+
&'life self,
268
270
cid: Cid,
269
271
rev: String,
270
272
is_create: Option<bool>,
271
-
) -> Pin<Box<dyn Future<Output = Result<()>> + Send + Sync + 'a>> {
273
+
) -> Pin<Box<dyn Future<Output = Result<()>> + Send + Sync + 'life>> {
272
274
let did: String = self.did.clone();
273
275
let db: Arc<DbConn> = self.db.clone();
274
276
let now: String = self.now.clone();
···
306
308
})
307
309
}
308
310
309
-
fn apply_commit<'a>(
310
-
&'a self,
311
+
fn apply_commit<'life>(
312
+
&'life self,
311
313
commit: CommitData,
312
314
is_create: Option<bool>,
313
-
) -> Pin<Box<dyn Future<Output = Result<()>> + Send + Sync + 'a>> {
315
+
) -> Pin<Box<dyn Future<Output = Result<()>> + Send + Sync + 'life>> {
314
316
Box::pin(async move {
315
317
self.update_root(commit.cid, commit.rev.clone(), is_create)
316
318
.await?;
+2
-2
src/endpoints/mod.rs
+2
-2
src/endpoints/mod.rs
···
1
1
//! Root module for all endpoints.
2
2
mod identity;
3
-
// mod repo;
3
+
mod repo;
4
4
mod server;
5
5
mod sync;
6
6
···
21
21
Router::new()
22
22
.route("/_health", get(health))
23
23
.merge(identity::routes()) // com.atproto.identity
24
-
// .merge(repo::routes()) // com.atproto.repo
24
+
.merge(repo::routes()) // com.atproto.repo
25
25
.merge(server::routes()) // com.atproto.server
26
26
.merge(sync::routes()) // com.atproto.sync
27
27
}