+97
-75
src/actor_store/blob.rs
+97
-75
src/actor_store/blob.rs
···
26
26
use rsky_pds::models::models;
27
27
use rsky_repo::error::BlobError;
28
28
use rsky_repo::types::{PreparedBlobRef, PreparedWrite};
29
+
use std::str::FromStr as _;
29
30
use std::sync::Arc;
30
31
31
32
use super::sql_blob::{BlobStoreSql, ByteStream};
···
244
245
use rsky_pds::schema::pds::blob::dsl as BlobSchema;
245
246
use rsky_pds::schema::pds::record_blob::dsl as RecordBlobSchema;
246
247
248
+
// Extract URIs
247
249
let uris: Vec<String> = writes
248
-
.clone()
249
-
.into_iter()
250
+
.iter()
250
251
.filter_map(|w| match w {
251
-
PreparedWrite::Delete(w) => Some(w.uri),
252
-
PreparedWrite::Update(w) => Some(w.uri),
252
+
PreparedWrite::Delete(w) => Some(w.uri.clone()),
253
+
PreparedWrite::Update(w) => Some(w.uri.clone()),
253
254
_ => None,
254
255
})
255
256
.collect();
···
258
259
return Ok(());
259
260
}
260
261
262
+
// In SQLite, we can't do DELETE...RETURNING
263
+
// So we need to fetch the records first, then delete
264
+
let did = self.did.clone();
265
+
let uris_clone = uris.clone();
261
266
let deleted_repo_blobs: Vec<models::RecordBlob> = self
262
267
.db
263
268
.run(move |conn| {
264
-
delete(RecordBlobSchema::record_blob)
265
-
.filter(RecordBlobSchema::recordUri.eq_any(uris))
266
-
.get_results(conn)
269
+
RecordBlobSchema::record_blob
270
+
.filter(RecordBlobSchema::recordUri.eq_any(&uris_clone))
271
+
.filter(RecordBlobSchema::did.eq(&did))
272
+
.load::<models::RecordBlob>(conn)
267
273
})
268
-
.await?
269
-
.into_iter()
270
-
.collect::<Vec<models::RecordBlob>>();
274
+
.await?;
271
275
272
276
if deleted_repo_blobs.is_empty() {
273
277
return Ok(());
274
278
}
275
279
280
+
// Now perform the delete
281
+
let uris_clone = uris.clone();
282
+
self.db
283
+
.run(move |conn| {
284
+
delete(RecordBlobSchema::record_blob)
285
+
.filter(RecordBlobSchema::recordUri.eq_any(uris_clone))
286
+
.execute(conn)
287
+
})
288
+
.await?;
289
+
290
+
// Extract blob cids from the deleted records
276
291
let deleted_repo_blob_cids: Vec<String> = deleted_repo_blobs
277
292
.into_iter()
278
293
.map(|row| row.blob_cid)
279
-
.collect::<Vec<String>>();
294
+
.collect();
280
295
281
-
let x = deleted_repo_blob_cids.clone();
282
-
let mut duplicated_cids: Vec<String> = self
296
+
// Find duplicates (blobs referenced by other records)
297
+
let cids_clone = deleted_repo_blob_cids.clone();
298
+
let did_clone = self.did.clone();
299
+
let duplicated_cids: Vec<String> = self
283
300
.db
284
301
.run(move |conn| {
285
302
RecordBlobSchema::record_blob
303
+
.filter(RecordBlobSchema::blobCid.eq_any(cids_clone))
304
+
.filter(RecordBlobSchema::did.eq(did_clone))
286
305
.select(RecordBlobSchema::blobCid)
287
-
.filter(RecordBlobSchema::blobCid.eq_any(&x))
288
-
.load(conn)
306
+
.load::<String>(conn)
289
307
})
290
-
.await?
291
-
.into_iter()
292
-
.collect::<Vec<String>>();
308
+
.await?;
293
309
294
-
let mut new_blob_cids: Vec<String> = writes
295
-
.into_iter()
296
-
.map(|w| match w {
297
-
PreparedWrite::Create(w) => w.blobs,
298
-
PreparedWrite::Update(w) => w.blobs,
299
-
PreparedWrite::Delete(_) => Vec::new(),
310
+
// Extract new blob cids from writes (creates and updates)
311
+
let new_blob_cids: Vec<String> = writes
312
+
.iter()
313
+
.flat_map(|w| match w {
314
+
PreparedWrite::Create(w) => w.blobs.clone(),
315
+
PreparedWrite::Update(w) => w.blobs.clone(),
316
+
_ => Vec::new(),
300
317
})
301
-
.collect::<Vec<Vec<PreparedBlobRef>>>()
302
-
.into_iter()
303
-
.flat_map(|v: Vec<PreparedBlobRef>| v.into_iter().map(|b| b.cid.to_string()))
318
+
.map(|b| b.cid.to_string())
304
319
.collect();
305
320
306
-
let mut cids_to_keep = Vec::new();
307
-
cids_to_keep.append(&mut new_blob_cids);
308
-
cids_to_keep.append(&mut duplicated_cids);
309
-
310
-
let cids_to_delete = deleted_repo_blob_cids
321
+
// Determine which blobs to keep vs delete
322
+
let cids_to_keep: Vec<String> = [&new_blob_cids[..], &duplicated_cids[..]].concat();
323
+
let cids_to_delete: Vec<String> = deleted_repo_blob_cids
311
324
.into_iter()
312
-
.filter_map(|cid: String| match cids_to_keep.contains(&cid) {
313
-
true => None,
314
-
false => Some(cid),
315
-
})
316
-
.collect::<Vec<String>>();
325
+
.filter(|cid| !cids_to_keep.contains(cid))
326
+
.collect();
317
327
318
328
if cids_to_delete.is_empty() {
319
329
return Ok(());
320
330
}
321
331
322
-
let y = cids_to_delete.clone();
332
+
// Delete from the blob table
333
+
let cids = cids_to_delete.clone();
334
+
let did_clone = self.did.clone();
323
335
self.db
324
336
.run(move |conn| {
325
337
delete(BlobSchema::blob)
326
-
.filter(BlobSchema::cid.eq_any(&y))
338
+
.filter(BlobSchema::cid.eq_any(cids))
339
+
.filter(BlobSchema::did.eq(did_clone))
327
340
.execute(conn)
328
341
})
329
342
.await?;
330
343
331
344
// Delete from blob storage
345
+
// Ideally we'd use a background queue here, but for now:
332
346
let _ = stream::iter(cids_to_delete)
333
-
.then(|cid| async { self.blobstore.delete(cid).await })
347
+
.then(|cid| async move {
348
+
match Cid::from_str(&cid) {
349
+
Ok(cid) => self.blobstore.delete(cid.to_string()).await,
350
+
Err(e) => Err(anyhow::Error::new(e)),
351
+
}
352
+
})
334
353
.collect::<Vec<_>>()
335
354
.await
336
355
.into_iter()
···
451
470
bail!("Limit too high. Max: 1000.");
452
471
}
453
472
454
-
let res: Vec<models::RecordBlob> = if let Some(cursor) = cursor {
455
-
RecordBlobSchema::record_blob
456
-
.limit(limit as i64)
457
-
.filter(not(exists(
458
-
BlobSchema::blob
459
-
.filter(BlobSchema::cid.eq(RecordBlobSchema::blobCid))
460
-
.filter(BlobSchema::did.eq(&did))
461
-
.select(models::Blob::as_select()),
462
-
)))
463
-
.filter(RecordBlobSchema::blobCid.gt(cursor))
464
-
.filter(RecordBlobSchema::did.eq(&did))
465
-
.select(models::RecordBlob::as_select())
466
-
.order(RecordBlobSchema::blobCid.asc())
467
-
.distinct_on(RecordBlobSchema::blobCid)
468
-
.get_results(conn)?
473
+
// TODO: Improve this query
474
+
475
+
// SQLite doesn't support DISTINCT ON, so we use GROUP BY instead
476
+
let query = RecordBlobSchema::record_blob
477
+
.filter(not(exists(
478
+
BlobSchema::blob
479
+
.filter(BlobSchema::cid.eq(RecordBlobSchema::blobCid))
480
+
.filter(BlobSchema::did.eq(&did)),
481
+
)))
482
+
.filter(RecordBlobSchema::did.eq(&did))
483
+
.into_boxed();
484
+
485
+
// Apply cursor filtering if provided
486
+
let query = if let Some(cursor) = cursor {
487
+
query.filter(RecordBlobSchema::blobCid.gt(cursor))
469
488
} else {
470
-
RecordBlobSchema::record_blob
471
-
.limit(limit as i64)
472
-
.filter(not(exists(
473
-
BlobSchema::blob
474
-
.filter(BlobSchema::cid.eq(RecordBlobSchema::blobCid))
475
-
.filter(BlobSchema::did.eq(&did))
476
-
.select(models::Blob::as_select()),
477
-
)))
478
-
.filter(RecordBlobSchema::did.eq(&did))
479
-
.select(models::RecordBlob::as_select())
480
-
.order(RecordBlobSchema::blobCid.asc())
481
-
.distinct_on(RecordBlobSchema::blobCid)
482
-
.get_results(conn)?
489
+
query
483
490
};
484
491
485
-
Ok(res
486
-
.into_iter()
487
-
.map(|row| ListMissingBlobsRefRecordBlob {
488
-
cid: row.blob_cid,
489
-
record_uri: row.record_uri,
490
-
})
491
-
.collect())
492
+
// For SQLite, use a simplified approach without GROUP BY to avoid recursion limit issues
493
+
let res = query
494
+
.select((RecordBlobSchema::blobCid, RecordBlobSchema::recordUri))
495
+
.order(RecordBlobSchema::blobCid.asc())
496
+
.limit(limit as i64)
497
+
.load::<(String, String)>(conn)?;
498
+
499
+
// Process results to get distinct cids with their first record URI
500
+
let mut result = Vec::new();
501
+
let mut last_cid = None;
502
+
503
+
for (cid, uri) in res {
504
+
if last_cid.as_ref() != Some(&cid) {
505
+
result.push(ListMissingBlobsRefRecordBlob {
506
+
cid: cid.clone(),
507
+
record_uri: uri,
508
+
});
509
+
last_cid = Some(cid);
510
+
}
511
+
}
512
+
513
+
Ok(result)
492
514
})
493
515
.await
494
516
}
+1
-2
src/actor_store/record.rs
+1
-2
src/actor_store/record.rs
···
439
439
use rsky_pds::schema::pds::backlink::dsl as BacklinkSchema;
440
440
self.db
441
441
.run(move |conn| {
442
-
insert_into(BacklinkSchema::backlink)
442
+
insert_or_ignore_into(BacklinkSchema::backlink)
443
443
.values(&backlinks)
444
-
.on_conflict_do_nothing()
445
444
.execute(conn)?;
446
445
Ok(())
447
446
})
+1
-2
src/actor_store/sql_repo.rs
+1
-2
src/actor_store/sql_repo.rs