+230
src/actor_store/actor_store.rs
+230
src/actor_store/actor_store.rs
···
1
+
use std::path::PathBuf;
2
+
use std::sync::Arc;
3
+
4
+
use anyhow::Result;
5
+
use sqlx::SqlitePool;
6
+
7
+
use super::actor_store_reader::ActorStoreReader;
8
+
use super::actor_store_transactor::ActorStoreTransactor;
9
+
use super::actor_store_writer::ActorStoreWriter;
10
+
use super::resources::ActorStoreResources;
11
+
use crate::SigningKey;
12
+
13
+
pub(crate) struct ActorStore {
14
+
pub(crate) directory: PathBuf,
15
+
reserved_key_dir: PathBuf,
16
+
resources: ActorStoreResources,
17
+
}
18
+
19
+
struct ActorLocation {
20
+
directory: PathBuf,
21
+
db_location: PathBuf,
22
+
key_location: PathBuf,
23
+
}
24
+
25
+
impl ActorStore {
26
+
pub(crate) fn new(directory: impl Into<PathBuf>, resources: ActorStoreResources) -> Self {
27
+
let directory = directory.into();
28
+
let reserved_key_dir = directory.join("reserved_keys");
29
+
Self {
30
+
directory,
31
+
reserved_key_dir,
32
+
resources,
33
+
}
34
+
}
35
+
36
+
pub(crate) async fn get_location(&self, did: &str) -> Result<ActorLocation> {
37
+
// const didHash = await crypto.sha256Hex(did)
38
+
// const directory = path.join(this.cfg.directory, didHash.slice(0, 2), did)
39
+
// const dbLocation = path.join(directory, `store.sqlite`)
40
+
// const keyLocation = path.join(directory, `key`)
41
+
// return { directory, dbLocation, keyLocation }
42
+
todo!()
43
+
}
44
+
45
+
pub(crate) async fn exists(&self, did: &str) -> Result<bool> {
46
+
// const location = await this.getLocation(did)
47
+
// return await fileExists(location.dbLocation)
48
+
todo!()
49
+
}
50
+
51
+
pub(crate) async fn keypair(&self, did: &str) -> Result<Arc<SigningKey>> {
52
+
// const { keyLocation } = await this.getLocation(did)
53
+
// const privKey = await fs.readFile(keyLocation)
54
+
// return crypto.Secp256k1Keypair.import(privKey)
55
+
todo!()
56
+
}
57
+
58
+
pub(crate) async fn open_db(&self, did: &str) -> Result<SqlitePool> {
59
+
// const { dbLocation } = await this.getLocation(did)
60
+
// const exists = await fileExists(dbLocation)
61
+
// if (!exists) {
62
+
// throw new InvalidRequestError('Repo not found', 'NotFound')
63
+
// }
64
+
65
+
// const db = getDb(dbLocation, this.cfg.disableWalAutoCheckpoint)
66
+
67
+
// // run a simple select with retry logic to ensure the db is ready (not in wal recovery mode)
68
+
// try {
69
+
// await retrySqlite(() =>
70
+
// db.db.selectFrom('repo_root').selectAll().execute(),
71
+
// )
72
+
// } catch (err) {
73
+
// db.close()
74
+
// throw err
75
+
// }
76
+
77
+
// return db
78
+
todo!()
79
+
}
80
+
81
+
pub(crate) async fn read<T, F>(&self, did: &str, f: F) -> Result<T>
82
+
where
83
+
F: FnOnce(ActorStoreReader) -> Result<T>,
84
+
{
85
+
// const db = await this.openDb(did)
86
+
// try {
87
+
// const getKeypair = () => this.keypair(did)
88
+
// return await fn(new ActorStoreReader(did, db, this.resources, getKeypair))
89
+
// } finally {
90
+
// db.close()
91
+
// }
92
+
todo!()
93
+
}
94
+
95
+
pub(crate) async fn transact<T, F>(&self, did: &str, f: F) -> Result<T>
96
+
where
97
+
F: FnOnce(ActorStoreTransactor) -> Result<T>,
98
+
{
99
+
// const keypair = await this.keypair(did)
100
+
// const db = await this.openDb(did)
101
+
// try {
102
+
// return await db.transaction((dbTxn) => {
103
+
// return fn(new ActorStoreTransactor(did, dbTxn, keypair, this.resources))
104
+
// })
105
+
// } finally {
106
+
// db.close()
107
+
// }
108
+
todo!()
109
+
}
110
+
111
+
pub(crate) async fn write_no_transaction<T, F>(&self, did: &str, f: F) -> Result<T>
112
+
where
113
+
F: FnOnce(ActorStoreWriter) -> Result<T>,
114
+
{
115
+
// const keypair = await this.keypair(did)
116
+
// const db = await this.openDb(did)
117
+
// try {
118
+
// return await fn(new ActorStoreWriter(did, db, keypair, this.resources))
119
+
// } finally {
120
+
// db.close()
121
+
// }
122
+
todo!()
123
+
}
124
+
125
+
pub(crate) async fn create(&self, did: &str, keypair: SigningKey) -> Result<()> {
126
+
// const { directory, dbLocation, keyLocation } = await this.getLocation(did)
127
+
// // ensure subdir exists
128
+
// await mkdir(directory, { recursive: true })
129
+
// const exists = await fileExists(dbLocation)
130
+
// if (exists) {
131
+
// throw new InvalidRequestError('Repo already exists', 'AlreadyExists')
132
+
// }
133
+
// const privKey = await keypair.export()
134
+
// await fs.writeFile(keyLocation, privKey)
135
+
136
+
// const db: ActorDb = getDb(dbLocation, this.cfg.disableWalAutoCheckpoint)
137
+
// try {
138
+
// await db.ensureWal()
139
+
// const migrator = getMigrator(db)
140
+
// await migrator.migrateToLatestOrThrow()
141
+
// } finally {
142
+
// db.close()
143
+
// }
144
+
todo!()
145
+
}
146
+
147
+
pub(crate) async fn destroy(&self, did: &str) -> Result<()> {
148
+
// const blobstore = this.resources.blobstore(did)
149
+
// if (blobstore instanceof DiskBlobStore) {
150
+
// await blobstore.deleteAll()
151
+
// } else {
152
+
// const cids = await this.read(did, async (store) =>
153
+
// store.repo.blob.getBlobCids(),
154
+
// )
155
+
// await Promise.allSettled(
156
+
// chunkArray(cids, 500).map((chunk) => blobstore.deleteMany(chunk)),
157
+
// )
158
+
// }
159
+
160
+
// const { directory } = await this.getLocation(did)
161
+
// await rmIfExists(directory, true)
162
+
todo!()
163
+
}
164
+
165
+
// async reserveKeypair(did?: string): Promise<string> {
166
+
// let keyLoc: string | undefined
167
+
// if (did) {
168
+
// assertSafePathPart(did)
169
+
// keyLoc = path.join(this.reservedKeyDir, did)
170
+
// const maybeKey = await loadKey(keyLoc)
171
+
// if (maybeKey) {
172
+
// return maybeKey.did()
173
+
// }
174
+
// }
175
+
// const keypair = await crypto.Secp256k1Keypair.create({ exportable: true })
176
+
// const keyDid = keypair.did()
177
+
// keyLoc = keyLoc ?? path.join(this.reservedKeyDir, keyDid)
178
+
// await mkdir(this.reservedKeyDir, { recursive: true })
179
+
// await fs.writeFile(keyLoc, await keypair.export())
180
+
// return keyDid
181
+
// }
182
+
183
+
// async getReservedKeypair(
184
+
// signingKeyOrDid: string,
185
+
// ): Promise<ExportableKeypair | undefined> {
186
+
// return loadKey(path.join(this.reservedKeyDir, signingKeyOrDid))
187
+
// }
188
+
189
+
// async clearReservedKeypair(keyDid: string, did?: string) {
190
+
// await rmIfExists(path.join(this.reservedKeyDir, keyDid))
191
+
// if (did) {
192
+
// await rmIfExists(path.join(this.reservedKeyDir, did))
193
+
// }
194
+
// }
195
+
196
+
// async storePlcOp(did: string, op: Uint8Array) {
197
+
// const { directory } = await this.getLocation(did)
198
+
// const opLoc = path.join(directory, `did-op`)
199
+
// await fs.writeFile(opLoc, op)
200
+
// }
201
+
202
+
// async getPlcOp(did: string): Promise<Uint8Array> {
203
+
// const { directory } = await this.getLocation(did)
204
+
// const opLoc = path.join(directory, `did-op`)
205
+
// return await fs.readFile(opLoc)
206
+
// }
207
+
208
+
// async clearPlcOp(did: string) {
209
+
// const { directory } = await this.getLocation(did)
210
+
// const opLoc = path.join(directory, `did-op`)
211
+
// await rmIfExists(opLoc)
212
+
// }
213
+
}
214
+
215
+
// const loadKey = async (loc: string): Promise<ExportableKeypair | undefined> => {
216
+
// const privKey = await readIfExists(loc)
217
+
// if (!privKey) return undefined
218
+
// return crypto.Secp256k1Keypair.import(privKey, { exportable: true })
219
+
// }
220
+
221
+
// function assertSafePathPart(part: string) {
222
+
// const normalized = path.normalize(part)
223
+
// assert(
224
+
// part === normalized &&
225
+
// !part.startsWith('.') &&
226
+
// !part.includes('/') &&
227
+
// !part.includes('\\'),
228
+
// `unsafe path part: ${part}`,
229
+
// )
230
+
// }
+44
src/actor_store/actor_store_reader.rs
+44
src/actor_store/actor_store_reader.rs
···
1
+
use std::sync::Arc;
2
+
3
+
use sqlx::SqlitePool;
4
+
5
+
use super::{
6
+
ActorStoreTransactor, db::ActorDb, preference::PreferenceReader, record::RecordReader,
7
+
repo::RepoReader, resources::ActorStoreResources,
8
+
};
9
+
use crate::SigningKey;
10
+
11
+
pub(crate) struct ActorStoreReader {
12
+
pub(crate) repo: RepoReader,
13
+
pub(crate) record: RecordReader,
14
+
pub(crate) pref: PreferenceReader,
15
+
}
16
+
17
+
impl ActorStoreReader {
18
+
pub(crate) fn new(
19
+
did: String,
20
+
db: ActorDb,
21
+
resources: ActorStoreResources,
22
+
keypair: impl Fn() -> Result<Arc<SigningKey>>,
23
+
) -> Self {
24
+
let blobstore = resources.blobstore(&did);
25
+
26
+
let repo = RepoReader::new(db.clone(), blobstore);
27
+
let record = RecordReader::new(db.clone());
28
+
let pref = PreferenceReader::new(db);
29
+
keypair();
30
+
31
+
Self { repo, record, pref }
32
+
}
33
+
34
+
pub(crate) async fn transact<T, F>(&self, f: F) -> Result<T>
35
+
where
36
+
F: FnOnce(ActorStoreTransactor) -> Result<T>,
37
+
{
38
+
let keypair = self.keypair();
39
+
let db_txn = self.db.transaction().await?;
40
+
let store =
41
+
ActorStoreTransactor::new(self.did.clone(), db_txn, keypair, self.resources.clone());
42
+
f(store)
43
+
}
44
+
}
+29
src/actor_store/actor_store_transactor.rs
+29
src/actor_store/actor_store_transactor.rs
···
1
+
use std::sync::Arc;
2
+
3
+
use sqlx::SqlitePool;
4
+
5
+
use super::resources::ActorStoreResources;
6
+
use crate::SigningKey;
7
+
8
+
pub(crate) struct ActorStoreTransactor {
9
+
pub(crate) did: String,
10
+
pub(crate) db: SqlitePool,
11
+
pub(crate) keypair: Arc<SigningKey>,
12
+
pub(crate) resources: ActorStoreResources,
13
+
}
14
+
15
+
impl ActorStoreTransactor {
16
+
pub(crate) fn new(
17
+
did: String,
18
+
db: SqlitePool,
19
+
keypair: Arc<SigningKey>,
20
+
resources: ActorStoreResources,
21
+
) -> Self {
22
+
Self {
23
+
did,
24
+
db,
25
+
keypair,
26
+
resources,
27
+
}
28
+
}
29
+
}
+13
src/actor_store/actor_store_writer.rs
+13
src/actor_store/actor_store_writer.rs
···
1
+
use std::sync::Arc;
2
+
3
+
use sqlx::SqlitePool;
4
+
5
+
use super::resources::ActorStoreResources;
6
+
use crate::SigningKey;
7
+
8
+
pub(crate) struct ActorStoreWriter {
9
+
pub(crate) did: String,
10
+
pub(crate) db: SqlitePool,
11
+
pub(crate) keypair: Arc<SigningKey>,
12
+
pub(crate) resources: ActorStoreResources,
13
+
}
+3
src/actor_store/blob/mod.rs
+3
src/actor_store/blob/mod.rs
+103
-121
src/actor_store/blob/reader.rs
+103
-121
src/actor_store/blob/reader.rs
···
1
1
//! Blob reading functionality.
2
2
3
+
use std::str::FromStr;
4
+
3
5
use anyhow::{Context as _, Result};
6
+
use atrium_api::com::atproto::admin::defs::StatusAttrData;
4
7
use atrium_repo::Cid;
5
8
use sqlx::{Row, SqlitePool};
6
9
7
-
use crate::config::BlobConfig;
8
-
9
10
/// Reader for blob data in the actor store.
10
-
pub(super) struct BlobReader {
11
+
pub(crate) struct BlobReader {
11
12
/// Database connection.
12
13
pub db: SqlitePool,
13
-
/// Configuration for blob storage.
14
-
pub config: BlobConfig,
15
-
/// DID of the repository owner.
16
-
pub did: String,
14
+
/// BlobStore.
15
+
pub blobstore: BlobStore,
17
16
}
18
17
19
18
impl BlobReader {
20
19
/// Create a new blob reader.
21
-
pub(super) fn new(db: SqlitePool, config: BlobConfig, did: String) -> Self {
22
-
Self { db, config, did }
20
+
pub(crate) fn new(db: SqlitePool, blobstore: BlobStore) -> Self {
21
+
Self { db, blobstore }
23
22
}
24
23
25
24
/// Get metadata for a blob.
26
-
pub(super) async fn get_blob_metadata(&self, cid: &Cid) -> Result<Option<BlobMetadata>> {
25
+
pub(crate) async fn get_blob_metadata(&self, cid: &Cid) -> Result<Option<BlobMetadata>> {
27
26
let cid_str = cid.to_string();
28
-
let result = sqlx::query!(
29
-
r#"SELECT size, mimeType, takedownRef FROM blob WHERE cid = ?"#,
27
+
let found = sqlx::query!(
28
+
r#"
29
+
SELECT mimeType, size, takedownRef
30
+
FROM blob
31
+
WHERE cid = ? AND takedownRef IS NULL
32
+
"#,
30
33
cid_str
31
34
)
32
35
.fetch_optional(&self.db)
33
36
.await
34
37
.context("failed to fetch blob metadata")?;
35
-
36
-
match result {
37
-
Some(row) => Ok(Some(BlobMetadata {
38
-
cid: cid.clone(),
39
-
size: row.size as u64,
40
-
mime_type: row.mimeType,
41
-
takedown_ref: row.takedownRef,
42
-
})),
43
-
None => Ok(None),
38
+
if found.is_none() {
39
+
return Err(anyhow::anyhow!("Blob not found")); // InvalidRequestError('Blob not found')
44
40
}
41
+
let found = found.unwrap();
42
+
let size = found.size as u64;
43
+
let mime_type = found.mimeType;
44
+
return Ok(Some(BlobMetadata { size, mime_type }));
45
45
}
46
46
47
47
/// Get a blob's full data and metadata.
48
-
pub(super) async fn get_blob(&self, cid: &Cid) -> Result<Option<BlobData>> {
49
-
// First check the metadata
50
-
let metadata = match self.get_blob_metadata(cid).await? {
51
-
Some(meta) => meta,
52
-
None => return Ok(None),
53
-
};
54
-
55
-
// If there's a takedown, return metadata only with no content
56
-
if metadata.takedown_ref.is_some() {
57
-
return Ok(Some(BlobData {
58
-
metadata,
59
-
content: None,
60
-
}));
61
-
}
62
-
63
-
// Get the blob file path
64
-
let blob_path = self.config.path.join(format!("{}.blob", cid));
65
-
66
-
// Check if file exists
67
-
if !blob_path.exists() {
68
-
return Ok(None);
48
+
pub(crate) async fn get_blob(&self, cid: &Cid) -> Result<Option<BlobData>> {
49
+
let metadata = self.get_blob_metadata(cid).await?;
50
+
let blob_stream = self.blobstore.get_stream(cid).await?;
51
+
if blob_stream.is_none() {
52
+
return Err(anyhow::anyhow!("Blob not found")); // InvalidRequestError('Blob not found')
69
53
}
70
-
71
-
// Read the file
72
-
let content = tokio::fs::read(&blob_path)
73
-
.await
74
-
.context("failed to read blob file")?;
75
-
54
+
let metadata = metadata.unwrap();
76
55
Ok(Some(BlobData {
77
-
metadata,
78
-
content: Some(content),
56
+
size: metadata.size,
57
+
mime_type: Some(metadata.mime_type),
58
+
stream: blob_stream.unwrap(),
79
59
}))
80
60
}
81
61
82
62
/// List blobs for a repository.
83
-
pub(super) async fn list_blobs(&self, opts: ListBlobsOptions) -> Result<Vec<String>> {
84
-
let mut query = sqlx::QueryBuilder::new("SELECT cid FROM blob");
85
-
86
-
// Add filters for since revision
63
+
pub(crate) async fn list_blobs(&self, opts: ListBlobsOptions) -> Result<Vec<String>> {
64
+
let mut query = sqlx::QueryBuilder::new(
65
+
"SELECT rb.blobCid FROM record_blob rb
66
+
INNER JOIN record r ON r.uri = rb.recordUri",
67
+
);
87
68
if let Some(since) = &opts.since {
88
-
query
89
-
.push(" WHERE EXISTS (")
90
-
.push("SELECT 1 FROM record_blob rb JOIN record r ON rb.recordUri = r.uri")
91
-
.push(" WHERE rb.blobCid = blob.cid AND r.repoRev > ")
92
-
.push_bind(since)
93
-
.push(")");
69
+
query.push(" WHERE r.repoRev > ").push_bind(since);
94
70
}
95
-
96
-
// Add cursor pagination
97
71
if let Some(cursor) = &opts.cursor {
98
-
if opts.since.is_some() {
99
-
query.push(" AND");
100
-
} else {
101
-
query.push(" WHERE");
102
-
}
103
-
query.push(" cid > ").push_bind(cursor);
72
+
query.push(" AND rb.blobCid > ").push_bind(cursor);
104
73
}
105
-
106
-
// Add order and limit
107
74
query
108
-
.push(" ORDER BY cid ASC")
75
+
.push(" ORDER BY rb.blobCid ASC")
109
76
.push(" LIMIT ")
110
77
.push_bind(opts.limit);
111
-
112
-
// Execute query
113
78
let blobs = query
114
79
.build()
115
80
.map(|row: sqlx::sqlite::SqliteRow| row.get::<String, _>(0))
116
81
.fetch_all(&self.db)
117
82
.await
118
-
.context("failed to list blobs")?;
119
-
83
+
.context("failed to fetch blobs")?;
120
84
Ok(blobs)
121
85
}
122
86
123
87
/// Get takedown status for a blob.
124
-
pub(super) async fn get_blob_takedown_status(&self, cid: &Cid) -> Result<Option<String>> {
88
+
pub(crate) async fn get_blob_takedown_status(
89
+
&self,
90
+
cid: &Cid,
91
+
) -> Result<Option<StatusAttrData>> {
92
+
// const res = await this.db.db
93
+
// .selectFrom('blob')
94
+
// .select('takedownRef')
95
+
// .where('cid', '=', cid.toString())
96
+
// .executeTakeFirst()
97
+
// if (!res) return null
98
+
// return res.takedownRef
99
+
// ? { applied: true, ref: res.takedownRef }
100
+
// : { applied: false }
125
101
let cid_str = cid.to_string();
126
102
let result = sqlx::query!(r#"SELECT takedownRef FROM blob WHERE cid = ?"#, cid_str)
127
103
.fetch_optional(&self.db)
128
104
.await
129
105
.context("failed to fetch blob takedown status")?;
130
106
131
-
Ok(result.and_then(|row| row.takedownRef))
107
+
Ok({
108
+
if result.is_none() {
109
+
None
110
+
} else {
111
+
let takedown_ref = result.unwrap().takedownRef.unwrap();
112
+
if takedown_ref == "false" {
113
+
Some(StatusAttrData {
114
+
applied: false,
115
+
r#ref: None,
116
+
})
117
+
} else {
118
+
Some(StatusAttrData {
119
+
applied: true,
120
+
r#ref: Some(takedown_ref),
121
+
})
122
+
}
123
+
}
124
+
})
132
125
}
133
126
134
127
/// Get records that reference a blob.
135
-
pub(super) async fn get_records_for_blob(&self, cid: &Cid) -> Result<Vec<String>> {
128
+
pub(crate) async fn get_records_for_blob(&self, cid: &Cid) -> Result<Vec<String>> {
136
129
let cid_str = cid.to_string();
137
130
let records = sqlx::query!(
138
131
r#"SELECT recordUri FROM record_blob WHERE blobCid = ?"#,
···
146
139
}
147
140
148
141
/// Get blobs referenced by a record.
149
-
pub(super) async fn get_blobs_for_record(&self, record_uri: &str) -> Result<Vec<String>> {
142
+
pub(crate) async fn get_blobs_for_record(&self, record_uri: &str) -> Result<Vec<String>> {
143
+
// const res = await this.db.db
144
+
// .selectFrom('blob')
145
+
// .innerJoin('record_blob', 'record_blob.blobCid', 'blob.cid')
146
+
// .where('recordUri', '=', recordUri)
147
+
// .select('blob.cid')
148
+
// .execute()
149
+
// return res.map((row) => row.cid)
150
150
let blobs = sqlx::query!(
151
-
r#"SELECT blobCid FROM record_blob WHERE recordUri = ?"#,
151
+
r#"SELECT blob.cid FROM blob INNER JOIN record_blob ON record_blob.blobCid = blob.cid WHERE recordUri = ?"#,
152
152
record_uri
153
153
)
154
154
.fetch_all(&self.db)
155
155
.await
156
156
.context("failed to fetch blobs for record")?;
157
157
158
-
Ok(blobs.into_iter().map(|r| r.blobCid).collect())
158
+
Ok(blobs.into_iter().map(|blob| blob.cid).collect())
159
159
}
160
160
161
161
/// Count total blobs.
162
-
pub(super) async fn blob_count(&self) -> Result<i64> {
162
+
pub(crate) async fn blob_count(&self) -> Result<i64> {
163
163
let result = sqlx::query!(r#"SELECT COUNT(*) as count FROM blob"#)
164
164
.fetch_one(&self.db)
165
165
.await
···
169
169
}
170
170
171
171
/// Count distinct blobs referenced by records.
172
-
pub(super) async fn record_blob_count(&self) -> Result<i64> {
172
+
pub(crate) async fn record_blob_count(&self) -> Result<i64> {
173
173
let result = sqlx::query!(r#"SELECT COUNT(DISTINCT blobCid) as count FROM record_blob"#)
174
174
.fetch_one(&self.db)
175
175
.await
···
179
179
}
180
180
181
181
/// List blobs that are referenced but missing from storage.
182
-
pub(super) async fn list_missing_blobs(
182
+
pub(crate) async fn list_missing_blobs(
183
183
&self,
184
184
opts: ListMissingBlobsOptions,
185
185
) -> Result<Vec<MissingBlob>> {
···
212
212
Ok(missing)
213
213
}
214
214
215
-
/// Register a new blob in the database (without file storage)
216
-
pub(super) async fn register_blob(
217
-
&self,
218
-
cid: String,
219
-
mime_type: String,
220
-
size: i64,
221
-
) -> Result<()> {
222
-
let now = chrono::Utc::now().to_rfc3339();
223
-
sqlx::query!(
224
-
r#"
225
-
INSERT INTO blob (cid, mimeType, size, createdAt)
226
-
VALUES (?, ?, ?, ?)
227
-
ON CONFLICT DO NOTHING
228
-
"#,
229
-
cid,
230
-
mime_type,
231
-
size,
232
-
now
233
-
)
234
-
.execute(&self.db)
235
-
.await
236
-
.context("failed to register blob")?;
237
-
238
-
Ok(())
215
+
pub(crate) async fn get_blod_cids(&self) -> Result<Vec<Cid>> {
216
+
let rows = sqlx::query!("SELECT cid FROM blob")
217
+
.fetch_all(&self.db)
218
+
.await
219
+
.context("failed to fetch blob CIDs")?;
220
+
Ok(rows
221
+
.into_iter()
222
+
.map(|row| Cid::from_str(&row.cid).unwrap())
223
+
.collect())
239
224
}
240
225
}
241
226
242
227
/// Metadata about a blob.
243
228
#[derive(Debug, Clone)]
244
-
pub(super) struct BlobMetadata {
245
-
/// The CID of the blob.
246
-
pub cid: Cid,
229
+
pub(crate) struct BlobMetadata {
247
230
/// The size of the blob in bytes.
248
231
pub size: u64,
249
232
/// The MIME type of the blob.
250
233
pub mime_type: String,
251
-
/// Reference for takedown, if any.
252
-
pub takedown_ref: Option<String>,
253
234
}
254
235
255
236
/// Complete blob data with content.
256
237
#[derive(Debug)]
257
-
pub(super) struct BlobData {
258
-
/// Metadata about the blob.
259
-
pub metadata: BlobMetadata,
260
-
/// The actual content of the blob, if available.
261
-
pub content: Option<Vec<u8>>,
238
+
pub(crate) struct BlobData {
239
+
/// The size of the blob.
240
+
pub size: u64,
241
+
/// The MIME type of the blob.
242
+
pub mime_type: Option<String>,
243
+
pub stream: BlobStream,
262
244
}
263
245
264
246
/// Options for listing blobs.
265
247
#[derive(Debug, Clone)]
266
-
pub(super) struct ListBlobsOptions {
248
+
pub(crate) struct ListBlobsOptions {
267
249
/// Optional revision to list blobs since.
268
250
pub since: Option<String>,
269
251
/// Optional cursor for pagination.
···
274
256
275
257
/// Options for listing missing blobs.
276
258
#[derive(Debug, Clone)]
277
-
pub(super) struct ListMissingBlobsOptions {
259
+
pub(crate) struct ListMissingBlobsOptions {
278
260
/// Optional cursor for pagination.
279
261
pub cursor: Option<String>,
280
262
/// Maximum number of missing blobs to return.
···
283
265
284
266
/// Information about a missing blob.
285
267
#[derive(Debug, Clone)]
286
-
pub(super) struct MissingBlob {
268
+
pub(crate) struct MissingBlob {
287
269
/// CID of the missing blob.
288
270
pub cid: String,
289
271
/// URI of the record referencing the missing blob.
+179
-195
src/actor_store/blob/transactor.rs
+179
-195
src/actor_store/blob/transactor.rs
···
1
1
//! Blob transaction functionality.
2
2
3
3
use anyhow::{Context as _, Result};
4
-
use atrium_api::types::{Blob, CidLink};
4
+
use atrium_api::{
5
+
com::atproto::admin::defs::StatusAttr,
6
+
types::{Blob, CidLink},
7
+
};
5
8
use atrium_repo::Cid;
6
9
use sha2::Digest;
7
10
use sqlx::{Row, SqlitePool};
···
9
12
use tokio::fs;
10
13
use uuid::Uuid;
11
14
12
-
use super::reader::BlobReader;
13
-
use crate::config::BlobConfig;
15
+
use super::BlobReader;
16
+
use crate::repo::types::{PreparedBlobRef, PreparedWrite, WriteOpAction};
14
17
15
18
/// Blob metadata for a newly uploaded blob.
16
19
#[derive(Debug, Clone)]
17
-
pub(super) struct BlobMetadata {
20
+
pub(crate) struct BlobMetadata {
18
21
/// Temporary key for the blob during upload.
19
22
pub temp_key: String,
20
23
/// Size of the blob in bytes.
···
30
33
}
31
34
32
35
/// Transactor for blob operations.
33
-
pub(super) struct BlobTransactor {
36
+
pub(crate) struct BlobTransactor {
34
37
/// The blob reader.
35
38
pub reader: BlobReader,
36
-
/// The blob storage directory.
37
-
pub blobs_dir: PathBuf,
38
-
/// Temporary directory for blob uploads.
39
-
pub temp_dir: PathBuf,
39
+
pub background_queue: BackgroundQueue,
40
40
}
41
41
42
42
impl BlobTransactor {
43
43
/// Create a new blob transactor.
44
-
pub(super) fn new(db: SqlitePool, config: BlobConfig, did: String) -> Self {
44
+
pub(crate) fn new(
45
+
db: SqlitePool,
46
+
blob_store: BlobStore,
47
+
background_queue: BackgroundQueue,
48
+
) -> Self {
45
49
Self {
46
-
reader: BlobReader::new(db, config.clone(), did),
47
-
blobs_dir: config.path.clone(),
48
-
temp_dir: config.path.join("temp"),
50
+
reader: BlobReader::new(db, blob_store),
51
+
background_queue,
49
52
}
50
53
}
51
54
52
55
/// Register blob associations with records.
53
-
pub(super) async fn insert_blobs(&self, record_uri: &str, blobs: &[Blob]) -> Result<()> {
56
+
pub(crate) async fn insert_blobs(&self, record_uri: &str, blobs: &[Blob]) -> Result<()> {
54
57
if blobs.is_empty() {
55
58
return Ok(());
56
59
}
···
84
87
}
85
88
86
89
/// Upload a blob and get its metadata.
87
-
pub(super) async fn upload_blob_and_get_metadata(
90
+
pub(crate) async fn upload_blob_and_get_metadata(
88
91
&self,
89
-
mime_type: &str,
90
-
data: &[u8],
92
+
user_suggested_mime: &str,
93
+
blob_stream: &[u8],
91
94
) -> Result<BlobMetadata> {
92
-
// Ensure temp directory exists
93
-
fs::create_dir_all(&self.temp_dir)
94
-
.await
95
-
.context("failed to create temp directory")?;
96
-
97
-
// Generate a temporary key
98
-
let temp_key = format!("temp-{}", Uuid::new_v4());
99
-
let temp_path = self.temp_dir.join(&temp_key);
100
-
101
-
// Write data to temp file
102
-
fs::write(&temp_path, data)
103
-
.await
104
-
.context("failed to write blob to temp file")?;
105
-
106
-
// Calculate SHA-256 hash for CID
107
-
let digest = sha2::Sha256::digest(data);
108
-
109
-
// Create CID from hash (using raw codec 0x55)
110
-
let multihash = atrium_repo::Multihash::wrap(atrium_repo::blockstore::SHA2_256, &digest)
111
-
.context("failed to create multihash")?;
112
-
let cid = Cid::new_v1(0x55, multihash);
113
-
114
-
// For now, we're not detecting image dimensions
115
-
let width = None;
116
-
let height = None;
117
-
95
+
let temp_key = self.reader.blobstore.put_temp(blob_stream).await?;
96
+
let size = stream_size(blob_stream).await?;
97
+
let sha256 = sha256_stream(blob_stream).await?;
98
+
let img_info = img::maybe_get_info(blob_stream).await?;
99
+
let sniffed_mime = mime_type_from_stream(blob_stream).await?;
100
+
let cid = sha256_raw_to_cid(sha256);
101
+
let mime_type = sniffed_mime.unwrap_or_else(|| user_suggested_mime.to_string());
118
102
Ok(BlobMetadata {
119
103
temp_key,
120
-
size: data.len() as u64,
104
+
size,
121
105
cid,
122
-
mime_type: mime_type.to_string(),
123
-
width,
124
-
height,
106
+
mime_type,
107
+
width: img_info.map(|info| info.width),
108
+
height: img_info.map(|info| info.height),
125
109
})
126
110
}
127
111
128
112
/// Track a new blob that's not yet associated with a record.
129
-
pub(super) async fn track_untethered_blob(&self, metadata: &BlobMetadata) -> Result<Blob> {
113
+
pub(crate) async fn track_untethered_blob(&self, metadata: &BlobMetadata) -> Result<Blob> {
130
114
let cid_str = metadata.cid.to_string();
131
115
132
116
// Check if blob exists and is taken down
···
178
162
}
179
163
180
164
/// Process blobs for a repository write operation.
181
-
pub(super) async fn process_write_blobs(
165
+
pub(crate) async fn process_write_blobs(&self, writes: Vec<PreparedWrite>) -> Result<()> {
166
+
self.delete_dereferenced_blobs(writes)
167
+
.await
168
+
.context("failed to delete dereferenced blobs")?;
169
+
for write in writes {
170
+
if write.action == WriteOpAction::Create || write.action == WriteOpAction::Update {
171
+
for blob in &write.blobs {
172
+
self.verify_blob_and_make_permanent(blob)
173
+
.await
174
+
.context("failed to verify and make blob permanent")?;
175
+
self.associate_blob(&blob.r#ref.0.to_string(), &write.uri)
176
+
.await
177
+
.context("failed to associate blob with record")?;
178
+
}
179
+
}
180
+
}
181
+
Ok(())
182
+
}
183
+
184
+
/// Update the takedown status of a blob.
185
+
pub(crate) async fn update_blob_takedown_status(
182
186
&self,
183
-
_rev: &str,
184
-
blobs: &[Blob],
185
-
uris: &[String],
187
+
blob: &Blob,
188
+
takedown: &StatusAttr,
186
189
) -> Result<()> {
187
-
// Handle deleted/updated records
188
-
self.delete_dereferenced_blobs(uris).await?;
190
+
let takedown_ref = if takedown.applied {
191
+
Some(
192
+
takedown
193
+
.r#ref
194
+
.clone()
195
+
.unwrap_or_else(|| Uuid::new_v4().to_string()),
196
+
)
197
+
} else {
198
+
None
199
+
};
189
200
190
-
// Process each blob
191
-
for blob in blobs {
192
-
// Verify and make permanent
193
-
self.verify_blob_and_make_permanent(blob).await?;
194
-
}
201
+
let cid_str = blob.r#ref.0.to_string();
202
+
sqlx::query!(
203
+
r#"UPDATE blob SET takedownRef = ? WHERE cid = ?"#,
204
+
takedown_ref,
205
+
cid_str
206
+
)
207
+
.execute(&self.reader.db)
208
+
.await
209
+
.context("failed to update blob takedown status")?;
195
210
196
211
Ok(())
197
212
}
198
213
199
214
/// Delete blobs that are no longer referenced by any record.
200
-
pub(super) async fn delete_dereferenced_blobs(&self, updated_uris: &[String]) -> Result<()> {
201
-
if updated_uris.is_empty() {
202
-
return Ok(());
203
-
}
204
-
205
-
// Find blobs that were referenced by the updated URIs
206
-
let placeholders = (0..updated_uris.len())
207
-
.map(|_| "?")
208
-
.collect::<Vec<_>>()
209
-
.join(",");
210
-
211
-
let query = format!(
212
-
"DELETE FROM record_blob WHERE recordUri IN ({}) RETURNING blobCid",
213
-
placeholders
214
-
);
215
+
pub(crate) async fn delete_dereferenced_blobs(
216
+
&self,
217
+
writes: Vec<PreparedWrite>,
218
+
skip_blob_store: bool,
219
+
) -> Result<()> {
220
+
let deletes = writes
221
+
.iter()
222
+
.filter(|w| w.action() == &WriteOpAction::Delete)
223
+
.collect::<Vec<_>>();
224
+
let updates = writes
225
+
.iter()
226
+
.filter(|w| w.action() == &WriteOpAction::Update)
227
+
.collect::<Vec<_>>();
228
+
let uris: Vec<String> = deletes
229
+
.iter()
230
+
.chain(updates.iter())
231
+
.map(|w| w.uri().to_string())
232
+
.collect();
215
233
216
-
let mut query_builder = sqlx::query(&query);
217
-
for uri in updated_uris {
218
-
query_builder = query_builder.bind(uri);
234
+
if uris.is_empty() {
235
+
return Ok(());
219
236
}
220
237
221
-
let deleted_blobs = query_builder
222
-
.map(|row: sqlx::sqlite::SqliteRow| row.get::<String, _>(0))
223
-
.fetch_all(&self.reader.db)
224
-
.await
225
-
.context("failed to delete dereferenced blobs")?;
238
+
// Delete blobs from record_blob table
239
+
let deleted_repo_blobs = sqlx::query!(
240
+
r#"DELETE FROM record_blob WHERE recordUri IN (?1) RETURNING *"#,
241
+
uris.join(",")
242
+
)
243
+
.fetch_all(&self.reader.db)
244
+
.await
245
+
.context("failed to delete dereferenced blobs")?;
226
246
227
-
if deleted_blobs.is_empty() {
247
+
if deleted_repo_blobs.is_empty() {
228
248
return Ok(());
229
249
}
230
250
231
-
// Find blobs that are still referenced elsewhere
232
-
let still_referenced = sqlx::query!(
233
-
r#"SELECT DISTINCT blobCid FROM record_blob WHERE blobCid IN (SELECT blobCid FROM record_blob)"#
251
+
// Get the CIDs of the deleted blobs
252
+
let deleted_repo_blob_cids: Vec<String> = deleted_repo_blobs
253
+
.iter()
254
+
.map(|row| row.blobCid.clone())
255
+
.collect();
256
+
257
+
// Check for duplicates in the record_blob table
258
+
let duplicate_cids = sqlx::query!(
259
+
r#"SELECT blobCid FROM record_blob WHERE blobCid IN (?1)"#,
260
+
deleted_repo_blob_cids.join(",")
234
261
)
235
262
.fetch_all(&self.reader.db)
236
263
.await
237
-
.context("failed to find referenced blobs")?;
264
+
.context("failed to fetch duplicate CIDs")?;
238
265
239
-
let referenced_set: std::collections::HashSet<String> = still_referenced
240
-
.into_iter()
241
-
.map(|row| row.blobCid)
266
+
// Get new blob CIDs from the writes
267
+
let new_blob_cids: Vec<String> = writes
268
+
.iter()
269
+
.filter_map(|w| {
270
+
if w.action() == &WriteOpAction::Create || w.action() == &WriteOpAction::Update {
271
+
todo!()
272
+
// Some(w.blobs().iter().map(|b| b.r#ref.0.to_string()).collect())
273
+
} else {
274
+
None
275
+
}
276
+
})
277
+
.flatten()
242
278
.collect();
243
279
244
-
// Delete blobs that are no longer referenced anywhere
245
-
let blobs_to_delete: Vec<String> = deleted_blobs
280
+
// Determine which CIDs to keep and which to delete
281
+
let cids_to_keep: Vec<String> = new_blob_cids
246
282
.into_iter()
247
-
.filter(|cid| !referenced_set.contains(cid))
283
+
.chain(duplicate_cids.into_iter().map(|row| row.blobCid))
248
284
.collect();
249
-
250
-
if !blobs_to_delete.is_empty() {
251
-
// Delete from database
252
-
let placeholders = (0..blobs_to_delete.len())
253
-
.map(|_| "?")
254
-
.collect::<Vec<_>>()
255
-
.join(",");
256
-
257
-
let query = format!("DELETE FROM blob WHERE cid IN ({})", placeholders);
258
-
259
-
let mut query_builder = sqlx::query(&query);
260
-
for cid in &blobs_to_delete {
261
-
query_builder = query_builder.bind(cid);
262
-
}
263
-
264
-
query_builder
265
-
.execute(&self.reader.db)
266
-
.await
267
-
.context("failed to delete blob records")?;
268
-
269
-
// Delete files from disk
270
-
for cid in blobs_to_delete {
271
-
let path = self.blobs_dir.join(format!("{}.blob", cid));
272
-
if let Err(e) = fs::remove_file(&path).await {
273
-
tracing::warn!("Failed to delete blob file: {:?}", e);
274
-
}
275
-
}
285
+
let cids_to_delete: Vec<String> = deleted_repo_blob_cids
286
+
.into_iter()
287
+
.filter(|cid| !cids_to_keep.contains(cid))
288
+
.collect();
289
+
if cids_to_delete.is_empty() {
290
+
return Ok(());
291
+
}
292
+
// Delete blobs from the blob table
293
+
sqlx::query!(
294
+
r#"DELETE FROM blob WHERE cid IN (?1)"#,
295
+
cids_to_delete.join(",")
296
+
)
297
+
.execute(&self.reader.db)
298
+
.await
299
+
.context("failed to delete dereferenced blobs from blob table")?;
300
+
// Optionally delete blobs from the blob store
301
+
if !skip_blob_store {
302
+
todo!();
276
303
}
277
-
278
304
Ok(())
279
305
}
280
306
281
307
/// Verify a blob's integrity and move it from temporary to permanent storage.
282
-
pub(super) async fn verify_blob_and_make_permanent(&self, blob: &Blob) -> Result<()> {
283
-
let cid_str = blob.r#ref.0.to_string();
284
-
285
-
// Get blob from database
286
-
let row = sqlx::query!(
287
-
r#"SELECT * FROM blob WHERE cid = ? AND takedownRef IS NULL"#,
288
-
cid_str
289
-
)
290
-
.fetch_optional(&self.reader.db)
291
-
.await
292
-
.context("failed to find blob")?;
293
-
294
-
let row = match row {
295
-
Some(r) => r,
296
-
None => return Err(anyhow::anyhow!("Could not find blob: {}", cid_str)),
297
-
};
298
-
299
-
// Verify size constraint
300
-
if (row.size as u64) > self.reader.config.limit {
301
-
return Err(anyhow::anyhow!(
302
-
"Blob is too large. Size is {} bytes but maximum allowed is {} bytes",
303
-
row.size,
304
-
self.reader.config.limit
305
-
));
308
+
pub(crate) async fn verify_blob_and_make_permanent(
309
+
&self,
310
+
blob: &PreparedBlobRef,
311
+
) -> Result<()> {
312
+
let cid_str = blob.cid.to_string();
313
+
let found = sqlx::query!(r#"SELECT * FROM blob WHERE cid = ?"#, cid_str)
314
+
.fetch_optional(&self.reader.db)
315
+
.await
316
+
.context("failed to fetch blob")?;
317
+
if found.is_none() {
318
+
return Err(anyhow::anyhow!("Blob not found"));
306
319
}
307
-
308
-
// Check MIME type
309
-
if row.mimeType != blob.mime_type {
310
-
return Err(anyhow::anyhow!(
311
-
"MIME type does not match. Expected: {}, got: {}",
312
-
row.mimeType,
313
-
blob.mime_type
314
-
));
320
+
let found = found.unwrap();
321
+
if found.takedownRef.is_some() {
322
+
return Err(anyhow::anyhow!("Blob has been taken down"));
315
323
}
316
-
317
-
// If temp key exists, move to permanent storage
318
-
if let Some(temp_key) = &row.tempKey {
319
-
let temp_path = self.temp_dir.join(temp_key);
320
-
let blob_path = self.blobs_dir.join(format!("{}.blob", cid_str));
321
-
322
-
// Only move if temp file exists and permanent file doesn't
323
-
if fs::try_exists(&temp_path).await.unwrap_or(false)
324
-
&& !fs::try_exists(&blob_path).await.unwrap_or(false)
325
-
{
326
-
// Create parent directories if needed
327
-
if let Some(parent) = blob_path.parent() {
328
-
fs::create_dir_all(parent)
329
-
.await
330
-
.context("failed to create blob directory")?;
331
-
}
332
-
333
-
// Move file from temp to permanent location
334
-
fs::copy(&temp_path, &blob_path)
335
-
.await
336
-
.context("failed to copy blob to permanent storage")?;
337
-
fs::remove_file(&temp_path)
338
-
.await
339
-
.context("failed to remove temporary blob file")?;
340
-
}
341
-
342
-
// Update database
343
-
sqlx::query!(r#"UPDATE blob SET tempKey = NULL WHERE cid = ?"#, cid_str)
344
-
.execute(&self.reader.db)
345
-
.await
346
-
.context("failed to update blob record after making permanent")?;
324
+
if found.tempKey.is_some() {
325
+
todo!("verify_blob"); // verify_blob(blob, found);
326
+
self.reader
327
+
.blobstore
328
+
.make_permanent(found.tempKey.unwrap(), cid_str)
329
+
.await?;
330
+
sqlx::query!(
331
+
r#"UPDATE blob SET tempKey = NULL WHERE tempKey = ?"#,
332
+
found.tempKey
333
+
)
334
+
.execute(&self.reader.db)
335
+
.await
336
+
.context("failed to update blob temp key")?;
347
337
}
348
-
349
338
Ok(())
350
339
}
351
340
352
-
/// Register a blob in the database
353
-
pub(super) async fn register_blob(
341
+
/// Associate a blob with a record
342
+
pub(crate) async fn associate_blob(
354
343
&self,
355
-
cid: String,
356
-
mime_type: String,
357
-
size: u64,
344
+
blob: &PreparedBlobRef,
345
+
record_uri: &str,
358
346
) -> Result<()> {
359
-
self.reader.register_blob(cid, mime_type, size as i64).await
360
-
}
361
-
362
-
/// Associate a blob with a record
363
-
pub(super) async fn associate_blob(&self, cid: &str, record_uri: &str) -> Result<()> {
347
+
let cid = blob.cid.to_string();
364
348
sqlx::query!(
365
349
r#"
366
350
INSERT INTO record_blob (blobCid, recordUri)
+10
-379
src/actor_store/mod.rs
+10
-379
src/actor_store/mod.rs
···
1
1
//! Actor store implementation for ATProto PDS.
2
2
3
+
mod actor_store;
4
+
mod actor_store_reader;
5
+
mod actor_store_transactor;
6
+
mod actor_store_writer;
3
7
mod blob;
4
8
mod db;
5
9
mod preference;
6
10
mod record;
7
11
mod repo;
8
-
9
-
use std::path::PathBuf;
10
-
use std::sync::Arc;
11
-
12
-
use anyhow::{Context as _, Result};
13
-
use atrium_crypto::keypair::Export as _;
14
-
use sqlx::SqlitePool;
15
-
16
-
use crate::SigningKey;
17
-
use crate::config::RepoConfig;
18
-
19
-
/// Resources required by the actor store.
20
-
pub(crate) struct ActorStoreResources {
21
-
/// Configuration for the repo.
22
-
pub(crate) config: RepoConfig,
23
-
/// Configuration for the blob store.
24
-
pub(crate) blob_config: crate::config::BlobConfig,
25
-
/// Background task queue (we'll need to implement this later).
26
-
pub(crate) background_queue: Arc<()>, // TODO: Placeholder until we implement a proper queue
27
-
}
28
-
29
-
/// The location of an actor's data.
30
-
pub(crate) struct ActorLocation {
31
-
/// The directory for the actor's data.
32
-
pub(crate) directory: PathBuf,
33
-
/// The database location for the actor.
34
-
pub(crate) db_location: PathBuf,
35
-
/// The keypair location for the actor.
36
-
pub(crate) key_location: PathBuf,
37
-
}
38
-
39
-
/// The actor store for repository data.
40
-
pub(crate) struct ActorStore {
41
-
/// The directory for actor data.
42
-
pub(crate) directory: PathBuf,
43
-
/// The directory for reserved keys.
44
-
reserved_key_dir: PathBuf,
45
-
/// Resources used by the actor store.
46
-
resources: ActorStoreResources,
47
-
}
48
-
49
-
/// Reader for actor data.
50
-
pub(crate) struct ActorStoreReader {
51
-
/// The DID of the actor.
52
-
pub(crate) did: String,
53
-
/// The database connection.
54
-
pub(crate) db: SqlitePool,
55
-
/// The actor's keypair.
56
-
keypair: Arc<SigningKey>,
57
-
/// Resources for the actor store.
58
-
pub(crate) resources: Arc<ActorStoreResources>,
59
-
/// Repository reader
60
-
pub(crate) repo: repo::RepoReader,
61
-
/// Record reader
62
-
pub(crate) record: record::RecordReader,
63
-
/// Preference reader
64
-
pub(crate) pref: preference::PreferenceReader,
65
-
}
12
+
mod resources;
66
13
67
-
/// Writer for actor data with transaction support.
68
-
pub(crate) struct ActorStoreWriter {
69
-
/// The DID of the actor.
70
-
pub(crate) did: String,
71
-
/// The database connection.
72
-
pub(crate) db: SqlitePool,
73
-
/// The actor's keypair.
74
-
keypair: Arc<SigningKey>,
75
-
/// Resources for the actor store.
76
-
pub(crate) resources: Arc<ActorStoreResources>,
77
-
/// Repository access
78
-
pub(crate) repo: repo::RepoTransactor,
79
-
/// Record access
80
-
pub(crate) record: record::RecordTransactor,
81
-
/// Preference access
82
-
pub(crate) pref: preference::PreferenceTransactor,
83
-
}
84
-
85
-
/// Transactor for actor data.
86
-
pub(crate) struct ActorStoreTransactor {
87
-
/// The DID of the actor.
88
-
pub(crate) did: String,
89
-
/// The database connection.
90
-
pub(crate) db: SqlitePool,
91
-
/// The actor's keypair.
92
-
keypair: Arc<SigningKey>,
93
-
/// Resources for the actor store.
94
-
pub(crate) resources: Arc<ActorStoreResources>,
95
-
/// Repository access
96
-
pub(crate) repo: repo::RepoTransactor,
97
-
/// Record access
98
-
pub(crate) record: record::RecordTransactor,
99
-
/// Preference access
100
-
pub(crate) pref: preference::PreferenceTransactor,
101
-
}
102
-
103
-
impl ActorStore {
104
-
/// Create a new actor store.
105
-
pub(crate) fn new(directory: impl Into<PathBuf>, resources: ActorStoreResources) -> Self {
106
-
let directory = directory.into();
107
-
let reserved_key_dir = directory.join("reserved_keys");
108
-
Self {
109
-
directory,
110
-
reserved_key_dir,
111
-
resources,
112
-
}
113
-
}
114
-
115
-
/// Load an actor store based on a given DID.
116
-
pub(crate) async fn load(did: &str, resources: ActorStoreResources) -> Result<Self> {
117
-
let did_hash = sha256_hex(did).await?;
118
-
let directory = resources.config.path.join(&did_hash[0..2]).join(did);
119
-
let reserved_key_dir = directory.join("reserved_keys");
120
-
121
-
Ok(Self {
122
-
directory,
123
-
reserved_key_dir,
124
-
resources,
125
-
})
126
-
}
127
-
128
-
/// Get the location of a DID's data.
129
-
pub(crate) async fn get_location(&self, did: &str) -> Result<ActorLocation> {
130
-
let did_hash = sha256_hex(did).await?;
131
-
let directory = self.directory.join(&did_hash[0..2]).join(did);
132
-
let db_location = directory.join("store.sqlite");
133
-
let key_location = directory.join("key");
134
-
135
-
Ok(ActorLocation {
136
-
directory,
137
-
db_location,
138
-
key_location,
139
-
})
140
-
}
141
-
142
-
/// Check if an actor exists.
143
-
pub(crate) async fn exists(&self, did: &str) -> Result<bool> {
144
-
let location = self.get_location(did).await?;
145
-
Ok(tokio::fs::try_exists(&location.db_location).await?)
146
-
}
147
-
148
-
/// Get the keypair for an actor.
149
-
pub(crate) async fn keypair(&self, did: &str) -> Result<Arc<SigningKey>> {
150
-
let location = self.get_location(did).await?;
151
-
let priv_key = tokio::fs::read(&location.key_location).await?;
152
-
let keypair = SigningKey::import(&priv_key)?;
153
-
Ok(Arc::new(keypair))
154
-
}
155
-
156
-
/// Open the database for an actor.
157
-
pub(crate) async fn open_db(&self, did: &str) -> Result<SqlitePool> {
158
-
let location = self.get_location(did).await?;
159
-
160
-
if !tokio::fs::try_exists(&location.db_location).await? {
161
-
return Err(anyhow::anyhow!("Repo not found"));
162
-
}
163
-
164
-
let db = sqlx::sqlite::SqlitePoolOptions::new()
165
-
.max_connections(5)
166
-
.connect_with(
167
-
sqlx::sqlite::SqliteConnectOptions::new()
168
-
.filename(&location.db_location)
169
-
.create_if_missing(false),
170
-
)
171
-
.await
172
-
.context("failed to connect to SQLite database")?;
173
-
174
-
Ok(db)
175
-
}
176
-
177
-
/// Read from an actor store.
178
-
pub(crate) async fn read<T, F>(&self, did: &str, f: F) -> Result<T>
179
-
where
180
-
F: FnOnce(ActorStoreReader) -> Result<T>,
181
-
{
182
-
let db = self.open_db(did).await?;
183
-
let keypair = self.keypair(did).await?;
184
-
let resources = Arc::new(self.resources.clone());
185
-
let did_str = did.to_string();
186
-
187
-
let reader = ActorStoreReader {
188
-
did: did_str.clone(),
189
-
repo: repo::RepoReader::new(
190
-
db.clone(),
191
-
did_str.clone(),
192
-
self.resources.blob_config.clone(),
193
-
),
194
-
record: record::RecordReader::new(db.clone(), did_str.clone()),
195
-
pref: preference::PreferenceReader::new(db.clone(), did_str),
196
-
db,
197
-
keypair,
198
-
resources,
199
-
};
200
-
201
-
f(reader)
202
-
}
203
-
204
-
/// Transact against an actor store with full transaction support.
205
-
pub(crate) async fn transact<T, F>(&self, did: &str, f: F) -> Result<T>
206
-
where
207
-
F: FnOnce(ActorStoreTransactor) -> Result<T>,
208
-
{
209
-
let keypair = self.keypair(did).await?;
210
-
let db = self.open_db(did).await?;
211
-
let resources = Arc::new(self.resources.clone());
212
-
let did_str = did.to_string();
213
-
214
-
let transactor = ActorStoreTransactor {
215
-
did: did_str.clone(),
216
-
repo: repo::RepoTransactor::new(
217
-
db.clone(),
218
-
did_str.clone(),
219
-
(*keypair).clone(),
220
-
self.resources.blob_config.clone(),
221
-
),
222
-
record: record::RecordTransactor::new(
223
-
db.clone(),
224
-
db.clone(), // Using db as placeholder for blobstore
225
-
did_str.clone(),
226
-
),
227
-
pref: preference::PreferenceTransactor::new(db.clone(), did_str),
228
-
db,
229
-
keypair,
230
-
resources,
231
-
};
232
-
233
-
f(transactor)
234
-
}
235
-
236
-
/// Write to an actor store without transaction support.
237
-
pub(crate) async fn write_no_transaction<T, F>(&self, did: &str, f: F) -> Result<T>
238
-
where
239
-
F: FnOnce(ActorStoreWriter) -> Result<T>,
240
-
{
241
-
let db = self.open_db(did).await?;
242
-
let keypair = self.keypair(did).await?;
243
-
let resources = Arc::new(self.resources.clone());
244
-
let did_str = did.to_string();
245
-
246
-
let writer = ActorStoreWriter {
247
-
did: did_str.clone(),
248
-
repo: repo::RepoTransactor::new(
249
-
db.clone(),
250
-
did_str.clone(),
251
-
(*keypair).clone(),
252
-
self.resources.blob_config.clone(),
253
-
),
254
-
record: record::RecordTransactor::new(
255
-
db.clone(),
256
-
db.clone(), // Using db as placeholder for blobstore
257
-
did_str.clone(),
258
-
),
259
-
pref: preference::PreferenceTransactor::new(db.clone(), did_str),
260
-
db,
261
-
keypair,
262
-
resources,
263
-
};
264
-
265
-
f(writer)
266
-
}
267
-
268
-
/// Create a new actor repository.
269
-
pub(crate) async fn create(&self, did: &str, keypair: SigningKey) -> Result<()> {
270
-
let location = self.get_location(did).await?;
271
-
272
-
// Create directory if it doesn't exist
273
-
tokio::fs::create_dir_all(&location.directory).await?;
274
-
275
-
// Check if repo already exists
276
-
if tokio::fs::try_exists(&location.db_location).await? {
277
-
return Err(anyhow::anyhow!("Repo already exists"));
278
-
}
279
-
280
-
// Export and save keypair
281
-
let priv_key = keypair.export();
282
-
tokio::fs::write(&location.key_location, priv_key).await?;
283
-
284
-
// Create database
285
-
let db = sqlx::sqlite::SqlitePoolOptions::new()
286
-
.connect_with(
287
-
sqlx::sqlite::SqliteConnectOptions::new()
288
-
.filename(&location.db_location)
289
-
.create_if_missing(true),
290
-
)
291
-
.await
292
-
.context("failed to create SQLite database")?;
293
-
294
-
// Create database schema
295
-
db::create_tables(&db).await?;
296
-
297
-
Ok(())
298
-
}
299
-
300
-
/// Destroy an actor's repository and associated data.
301
-
pub(crate) async fn destroy(&self, did: &str) -> Result<()> {
302
-
// TODO: Implement repository destruction
303
-
// - Delete all blobs for the repository
304
-
// - Remove the repository directory
305
-
todo!("Implement repository destruction")
306
-
}
307
-
308
-
/// Reserve a keypair for a DID.
309
-
pub(crate) async fn reserve_keypair(&self, did: Option<&str>) -> Result<String> {
310
-
// TODO: Implement keypair reservation
311
-
// - Generate a keypair if one doesn't exist
312
-
// - Store the keypair in the reserved_key_dir
313
-
// - Return the DID of the keypair
314
-
todo!("Implement keypair reservation")
315
-
}
316
-
317
-
/// Get a reserved keypair.
318
-
pub(crate) async fn get_reserved_keypair(
319
-
&self,
320
-
signing_key_or_did: &str,
321
-
) -> Result<Option<()>> {
322
-
// TODO: Implement getting a reserved keypair
323
-
// - Load the keypair from the reserved_key_dir
324
-
todo!("Implement getting a reserved keypair")
325
-
}
326
-
327
-
/// Clear a reserved keypair.
328
-
pub(crate) async fn clear_reserved_keypair(
329
-
&self,
330
-
key_did: &str,
331
-
did: Option<&str>,
332
-
) -> Result<()> {
333
-
// TODO: Implement clearing a reserved keypair
334
-
// - Remove the keypair file from the reserved_key_dir
335
-
todo!("Implement clearing a reserved keypair")
336
-
}
337
-
338
-
/// Store a PLC operation.
339
-
pub(crate) async fn store_plc_op(&self, did: &str, op: &[u8]) -> Result<()> {
340
-
// TODO: Implement storing a PLC operation
341
-
// - Store the operation in the actor's directory
342
-
todo!("Implement storing a PLC operation")
343
-
}
344
-
345
-
/// Get a stored PLC operation.
346
-
pub(crate) async fn get_plc_op(&self, did: &str) -> Result<Vec<u8>> {
347
-
// TODO: Implement getting a PLC operation
348
-
// - Retrieve the operation from the actor's directory
349
-
todo!("Implement getting a PLC operation")
350
-
}
351
-
352
-
/// Clear a stored PLC operation.
353
-
pub(crate) async fn clear_plc_op(&self, did: &str) -> Result<()> {
354
-
// TODO: Implement clearing a PLC operation
355
-
// - Remove the operation file from the actor's directory
356
-
todo!("Implement clearing a PLC operation")
357
-
}
358
-
}
359
-
360
-
impl ActorStoreWriter {
361
-
/// Transact with the writer.
362
-
pub(crate) async fn transact<T, F>(&self, f: F) -> Result<T>
363
-
where
364
-
F: FnOnce(ActorStoreTransactor) -> Result<T>,
365
-
{
366
-
todo!("Implement transact method for ActorStoreWriter")
367
-
}
368
-
}
369
-
370
-
impl Clone for ActorStoreResources {
371
-
fn clone(&self) -> Self {
372
-
Self {
373
-
config: self.config.clone(),
374
-
blob_config: self.blob_config.clone(),
375
-
background_queue: self.background_queue.clone(),
376
-
}
377
-
}
378
-
}
379
-
380
-
// Helper function for SHA-256 hashing
381
-
async fn sha256_hex(input: &str) -> Result<String> {
382
-
use sha2::{Digest, Sha256};
383
-
let mut hasher = Sha256::new();
384
-
hasher.update(input.as_bytes());
385
-
let result = hasher.finalize();
386
-
Ok(hex::encode(result))
387
-
}
14
+
pub(crate) use actor_store::ActorStore;
15
+
pub(crate) use actor_store_reader::ActorStoreReader;
16
+
pub(crate) use actor_store_transactor::ActorStoreTransactor;
17
+
pub(crate) use actor_store_writer::ActorStoreWriter;
18
+
pub(crate) use resources::ActorStoreResources;
+11
-13
src/actor_store/repo/reader.rs
+11
-13
src/actor_store/repo/reader.rs
···
5
5
use sqlx::SqlitePool;
6
6
7
7
use super::sql_repo_reader::SqlRepoReader;
8
-
use crate::{config::BlobConfig, repo::block_map::BlockMap};
8
+
use crate::{actor_store::record::RecordReader, config::BlobConfig, repo::block_map::BlockMap};
9
9
10
10
/// Reader for repository data in the actor store.
11
11
pub(crate) struct RepoReader {
12
+
blob: BlobReader,
13
+
record: RecordReader,
12
14
/// The SQL repository reader.
13
-
pub storage: SqlRepoReader,
14
-
/// The database connection.
15
-
pub db: SqlitePool,
16
-
/// The DID of the repository owner.
17
-
pub did: String,
15
+
storage: SqlRepoReader,
18
16
}
19
17
20
18
impl RepoReader {
21
19
/// Create a new repository reader.
22
-
pub(crate) fn new(db: SqlitePool, did: String, blob_config: BlobConfig) -> Self {
23
-
Self {
24
-
storage: SqlRepoReader::new(db.clone(), did.clone()),
25
-
db,
26
-
did,
27
-
}
28
-
}
20
+
// pub(crate) fn new(db: SqlitePool, did: String, blob_config: BlobConfig) -> Self {
21
+
// Self {
22
+
// storage: SqlRepoReader::new(db.clone(), did.clone()),
23
+
// db,
24
+
// did,
25
+
// }
26
+
// }
29
27
30
28
/// Get event data for synchronization.
31
29
pub(crate) async fn get_sync_event_data(&self) -> Result<SyncEventData> {
+20
-2
src/actor_store/repo/sql_repo_transactor.rs
+20
-2
src/actor_store/repo/sql_repo_transactor.rs
···
10
10
use sha2::Digest;
11
11
use sqlx::SqlitePool;
12
12
13
-
use crate::repo::block_map::{BlockMap, CommitData};
13
+
use crate::repo::{block_map::BlockMap, types::CommitData};
14
14
15
-
use super::sql_repo_reader::SqlRepoReader;
15
+
use super::sql_repo_reader::{RootInfo, SqlRepoReader};
16
16
17
17
/// SQL-based repository transactor that extends the reader.
18
18
pub(crate) struct SqlRepoTransactor {
···
33
33
cache: BlockMap::new(),
34
34
now,
35
35
}
36
+
}
37
+
38
+
/// Get the root CID and revision of the repository.
39
+
pub(crate) async fn get_root_detailed(&self) -> Result<RootInfo> {
40
+
let row = sqlx::query!(
41
+
r#"
42
+
SELECT cid, rev
43
+
FROM repo_root
44
+
WHERE did = ?
45
+
LIMIT 1
46
+
"#,
47
+
self.reader.did
48
+
)
49
+
.fetch_one(&self.reader.db)
50
+
.await?;
51
+
52
+
let cid = Cid::from_str(&row.cid)?;
53
+
Ok(RootInfo { cid, rev: row.rev })
36
54
}
37
55
38
56
/// Proactively cache all blocks from a particular commit.
+256
-14
src/actor_store/repo/transactor.rs
+256
-14
src/actor_store/repo/transactor.rs
···
1
1
//! Repository transactor for the actor store.
2
2
3
+
use std::str::FromStr;
4
+
3
5
use anyhow::Result;
4
6
use atrium_repo::Cid;
5
-
use rsky_repo::types::{CommitDataWithOps, PreparedWrite};
6
7
use rsky_syntax::aturi::AtUri;
7
8
use sqlx::SqlitePool;
8
9
9
-
use crate::SigningKey;
10
+
use crate::{
11
+
SigningKey,
12
+
repo::types::{CommitAction, CommitDataWithOps, CommitOp, PreparedWrite, WriteOpAction},
13
+
};
10
14
11
15
use super::{reader::RepoReader, sql_repo_transactor::SqlRepoTransactor};
12
16
13
17
/// Transactor for repository operations.
14
18
pub(crate) struct RepoTransactor {
15
-
/// The repository reader.
16
-
pub reader: RepoReader,
17
-
/// The SQL repository transactor.
18
-
pub storage: SqlRepoTransactor,
19
-
/// The DID of the repository owner.
20
-
pub did: String,
21
-
/// The signing key for the repository.
22
-
pub signing_key: SigningKey,
19
+
/// The inner reader.
20
+
pub(crate) reader: RepoReader,
21
+
///
23
22
}
23
+
24
24
25
25
impl RepoTransactor {
26
26
/// Create a new repository transactor.
···
59
59
writes: Vec<PreparedWrite>,
60
60
swap_commit_cid: Option<Cid>,
61
61
) -> Result<CommitDataWithOps> {
62
-
todo!("Implement process_writes")
62
+
// Validate parameters
63
+
if writes.len() > 200 {
64
+
return Err(anyhow::anyhow!("Too many writes. Max: 200").into());
65
+
}
66
+
67
+
// Format the commit (creates blocks and structures the operations)
68
+
let commit = self.format_commit(writes.clone(), swap_commit_cid).await?;
69
+
70
+
// Check commit size (prevent large commits)
71
+
if commit.commit_data.relevant_blocks.byte_size()? > 2000000 {
72
+
return Err(anyhow::anyhow!("Too many writes. Max event size: 2MB").into());
73
+
}
74
+
75
+
// Execute these operations in parallel for better performance
76
+
tokio::try_join!(
77
+
// Persist the commit to repo storage
78
+
self.storage.apply_commit(commit.commit_data.clone(), true),
79
+
// Send to indexing
80
+
self.index_writes(writes.clone(), &commit.commit_data.rev),
81
+
// Process blobs from writes
82
+
self.process_write_blobs(&commit.commit_data.rev, &writes),
83
+
)?;
84
+
85
+
Ok(commit)
63
86
}
64
87
65
88
/// Format a commit for writing.
···
68
91
writes: Vec<PreparedWrite>,
69
92
swap_commit: Option<Cid>,
70
93
) -> Result<CommitDataWithOps> {
71
-
todo!("Implement format_commit")
94
+
// Get current repository root
95
+
let curr_root = self.storage.get_root_detailed().await?;
96
+
if curr_root.cid.is_nil() {
97
+
return Err(anyhow::anyhow!("No repo root found for {}", self.did).into());
98
+
}
99
+
100
+
// Check swap commit if provided
101
+
if let Some(swap_cid) = swap_commit {
102
+
if !curr_root.cid.equals(swap_cid) {
103
+
return Err(anyhow::anyhow!(
104
+
"Bad commit swap: expected {}, got {}",
105
+
swap_cid,
106
+
curr_root.cid
107
+
)
108
+
.into());
109
+
}
110
+
}
111
+
112
+
// Cache last commit for performance
113
+
self.storage.cache_rev(&curr_root.rev).await?;
114
+
115
+
let mut new_record_cids = Vec::new();
116
+
let mut del_and_update_uris = Vec::new();
117
+
let mut commit_ops = Vec::new();
118
+
119
+
// Process each write to create commit operations
120
+
for write in &writes {
121
+
let uri_str = write.uri().clone();
122
+
let uri = AtUri::try_from(uri_str.as_str())?;
123
+
124
+
match write.action() {
125
+
WriteOpAction::Create | WriteOpAction::Update => {
126
+
if let Some(cid) = write.cid() {
127
+
new_record_cids.push(cid);
128
+
}
129
+
}
130
+
_ => {}
131
+
}
132
+
133
+
if write.action() != &WriteOpAction::Create {
134
+
del_and_update_uris.push(uri.clone());
135
+
}
136
+
137
+
// Get current record if it exists
138
+
let record = self.record.get_record(&uri.to_string(), None, true).await?;
139
+
let curr_record = record.map(|r| Cid::from_str(&r.cid).ok()).flatten();
140
+
141
+
// Create the operation
142
+
let path = format!("{}/{}", uri.get_collection(), uri.get_rkey());
143
+
let mut op = CommitOp {
144
+
action: match write.action() {
145
+
WriteOpAction::Create => CommitAction::Create,
146
+
WriteOpAction::Update => CommitAction::Update,
147
+
WriteOpAction::Delete => CommitAction::Delete,
148
+
},
149
+
path: path.clone(),
150
+
cid: match write.action() {
151
+
WriteOpAction::Delete => None,
152
+
_ => write.cid(),
153
+
},
154
+
prev: curr_record.clone(),
155
+
};
156
+
157
+
// Validate swap consistency
158
+
if let Some(swap_cid) = write.swap_cid() {
159
+
match write.action() {
160
+
WriteOpAction::Create if swap_cid.is_some() => {
161
+
return Err(anyhow::anyhow!(
162
+
"Bad record swap: cannot provide swap CID for create"
163
+
)
164
+
.into());
165
+
}
166
+
WriteOpAction::Update | WriteOpAction::Delete if swap_cid.is_none() => {
167
+
return Err(anyhow::anyhow!(
168
+
"Bad record swap: must provide swap CID for update/delete"
169
+
)
170
+
.into());
171
+
}
172
+
_ => {}
173
+
}
174
+
175
+
if swap_cid.is_some()
176
+
&& curr_record.is_some()
177
+
&& !curr_record.unwrap().equals(swap_cid.unwrap())
178
+
{
179
+
return Err(anyhow::anyhow!(
180
+
"Bad record swap: expected {}, got {:?}",
181
+
swap_cid.unwrap(),
182
+
curr_record
183
+
)
184
+
.into());
185
+
}
186
+
}
187
+
188
+
commit_ops.push(op);
189
+
}
190
+
191
+
// Load the current repository and prepare for modification
192
+
let repo = crate::storage::open_repo(&self.storage.reader.config, &self.did, curr_root.cid)
193
+
.await?;
194
+
let prev_data = repo.commit().data;
195
+
196
+
// Convert PreparedWrites to RecordWriteOps
197
+
let write_ops = writes
198
+
.iter()
199
+
.map(|w| write_to_op(w.clone()))
200
+
.collect::<Result<Vec<_>>>()?;
201
+
202
+
// Format the new commit
203
+
let commit = repo.format_commit(write_ops, &self.signing_key).await?;
204
+
205
+
// Find blocks referenced by other records
206
+
let dupe_record_cids = self
207
+
.get_duplicate_record_cids(&commit.removed_cids.to_list(), &del_and_update_uris)
208
+
.await?;
209
+
210
+
// Remove duplicated CIDs from the removal list
211
+
for cid in dupe_record_cids {
212
+
commit.removed_cids.delete(cid);
213
+
}
214
+
215
+
// Ensure all necessary blocks are included
216
+
let new_record_blocks = commit.relevant_blocks.get_many(new_record_cids)?;
217
+
if !new_record_blocks.missing.is_empty() {
218
+
let missing_blocks = self.storage.get_blocks(new_record_blocks.missing).await?;
219
+
commit.relevant_blocks.add_map(missing_blocks.blocks)?;
220
+
}
221
+
222
+
Ok(CommitDataWithOps {
223
+
commit_data: commit,
224
+
ops: commit_ops,
225
+
prev_data: Some(prev_data),
226
+
})
72
227
}
73
228
74
229
/// Index writes in the repository.
75
230
pub(crate) async fn index_writes(&self, writes: Vec<PreparedWrite>, rev: &str) -> Result<()> {
76
-
todo!("Implement index_writes")
231
+
// Process each write for indexing
232
+
for write in writes {
233
+
let uri_str = write.uri().clone();
234
+
let uri = AtUri::try_from(uri_str.as_str())?;
235
+
236
+
match write.action() {
237
+
WriteOpAction::Create | WriteOpAction::Update => {
238
+
if let PreparedWrite::Create(w) | PreparedWrite::Update(w) = write {
239
+
self.record
240
+
.index_record(
241
+
uri,
242
+
w.cid,
243
+
Some(w.record),
244
+
write.action().clone(),
245
+
rev,
246
+
None, // Use current timestamp
247
+
)
248
+
.await?;
249
+
}
250
+
}
251
+
WriteOpAction::Delete => {
252
+
self.record.delete_record(&uri).await?;
253
+
}
254
+
}
255
+
}
256
+
257
+
Ok(())
77
258
}
78
259
79
260
/// Get duplicate record CIDs.
···
82
263
cids: &[Cid],
83
264
touched_uris: &[AtUri],
84
265
) -> Result<Vec<Cid>> {
85
-
todo!("Implement get_duplicate_record_cids")
266
+
if touched_uris.is_empty() || cids.is_empty() {
267
+
return Ok(Vec::new());
268
+
}
269
+
270
+
let cid_strs: Vec<String> = cids.iter().map(|c| c.to_string()).collect();
271
+
let uri_strs: Vec<String> = touched_uris.iter().map(|u| u.to_string()).collect();
272
+
273
+
// Find records that have the same CIDs but weren't touched in this operation
274
+
let duplicates = sqlx::query_scalar!(
275
+
r#"
276
+
SELECT cid FROM record
277
+
WHERE cid IN (SELECT unnest($1::text[]))
278
+
AND uri NOT IN (SELECT unnest($2::text[]))
279
+
"#,
280
+
&cid_strs,
281
+
&uri_strs
282
+
)
283
+
.fetch_all(&self.reader.db)
284
+
.await?;
285
+
286
+
// Convert string CIDs back to Cid objects
287
+
let dupe_cids = duplicates
288
+
.into_iter()
289
+
.filter_map(|cid_str| Cid::from_str(&cid_str).ok())
290
+
.collect();
291
+
292
+
Ok(dupe_cids)
293
+
}
294
+
295
+
pub(crate) async fn process_write_blobs(
296
+
&self,
297
+
rev: &str,
298
+
writes: &[PreparedWrite],
299
+
) -> Result<()> {
300
+
// First handle deletions or updates
301
+
let uris: Vec<String> = writes
302
+
.iter()
303
+
.filter(|w| matches!(w.action(), WriteOpAction::Delete | WriteOpAction::Update))
304
+
.map(|w| w.uri().clone())
305
+
.collect();
306
+
307
+
if !uris.is_empty() {
308
+
// Remove blob references for deleted/updated records
309
+
self.blob.delete_dereferenced_blobs(&uris).await?;
310
+
}
311
+
312
+
// Process each blob in each write
313
+
for write in writes {
314
+
if let PreparedWrite::Create(w) | PreparedWrite::Update(w) = write {
315
+
for blob in &w.blobs {
316
+
// Verify and make permanent
317
+
self.blob.verify_blob_and_make_permanent(blob).await?;
318
+
319
+
// Associate blob with record
320
+
self.blob
321
+
.associate_blob(&blob.cid.to_string(), write.uri())
322
+
.await?;
323
+
}
324
+
}
325
+
}
326
+
327
+
Ok(())
86
328
}
87
329
}
+20
src/actor_store/resources.rs
+20
src/actor_store/resources.rs
···
1
+
2
+
use std::sync::Arc;
3
+
4
+
use crate::config::{BlobConfig, RepoConfig};
5
+
6
+
pub(crate) struct ActorStoreResources {
7
+
pub(crate) config: RepoConfig,
8
+
pub(crate) blob_config: BlobConfig,
9
+
pub(crate) background_queue: Arc<()>, // Placeholder
10
+
}
11
+
12
+
impl Clone for ActorStoreResources {
13
+
fn clone(&self) -> Self {
14
+
Self {
15
+
config: self.config.clone(),
16
+
blob_config: self.blob_config.clone(),
17
+
background_queue: self.background_queue.clone(),
18
+
}
19
+
}
20
+
}
-35
src/repo/block_map.rs
-35
src/repo/block_map.rs
···
10
10
use std::collections::{BTreeMap, HashSet};
11
11
use std::str::FromStr;
12
12
13
-
/// Ref: https://github.com/blacksky-algorithms/rsky/blob/main/rsky-repo/src/types.rs#L341C1-L350C2
14
-
#[derive(Debug, Clone, PartialEq, Deserialize, Serialize)]
15
-
pub(crate) struct CommitData {
16
-
pub cid: Cid,
17
-
pub rev: String,
18
-
pub since: Option<String>,
19
-
pub prev: Option<Cid>,
20
-
pub new_blocks: BlockMap,
21
-
pub relevant_blocks: BlockMap,
22
-
pub removed_cids: CidSet,
23
-
}
24
-
25
-
#[derive(Debug, Clone, PartialEq, Deserialize, Serialize)]
26
-
pub enum CommitAction {
27
-
Create,
28
-
Update,
29
-
Delete,
30
-
}
31
-
32
-
#[derive(Debug, Clone, PartialEq, Deserialize, Serialize)]
33
-
pub struct CommitOp {
34
-
pub action: CommitAction,
35
-
pub path: String,
36
-
pub cid: Option<Cid>,
37
-
pub prev: Option<Cid>,
38
-
}
39
-
40
-
#[derive(Debug, Clone, PartialEq, Deserialize, Serialize)]
41
-
pub struct CommitDataWithOps {
42
-
#[serde(flatten)]
43
-
pub commit_data: CommitData,
44
-
pub ops: Vec<CommitOp>,
45
-
pub prev_data: Option<Cid>,
46
-
}
47
-
48
13
/// Ref: https://github.com/blacksky-algorithms/rsky/blob/main/rsky-repo/src/cid_set.rs
49
14
#[derive(Debug, Clone, PartialEq, Deserialize, Serialize)]
50
15
pub(crate) struct CidSet {
+9
src/repo/types.rs
+9
src/repo/types.rs
···
163
163
PreparedWrite::Delete(w) => &w.action,
164
164
}
165
165
}
166
+
167
+
/// TEQ: Add blobs() impl
168
+
pub fn blobs(&self) -> Option<&Vec<PreparedBlobRef>> {
169
+
match self {
170
+
PreparedWrite::Create(w) => Some(&w.blobs),
171
+
PreparedWrite::Update(w) => Some(&w.blobs),
172
+
PreparedWrite::Delete(_) => None,
173
+
}
174
+
}
166
175
}
167
176
168
177
impl From<&PreparedWrite> for CommitAction {