forked from
oyster.cafe/bspds-sandbox
fork
Configure Feed
Select the types of activity you want to include in your feed.
PDS software with bells & whistles you didn’t even know you needed. will move this to its own account when ready.
fork
Configure Feed
Select the types of activity you want to include in your feed.
1use bytes::Bytes;
2use cid::Cid;
3use ipld_core::ipld::Ipld;
4use iroh_car::CarReader;
5use serde_json::Value as JsonValue;
6use sqlx::PgPool;
7use std::collections::HashMap;
8use std::io::Cursor;
9use thiserror::Error;
10use tracing::debug;
11use uuid::Uuid;
12
13#[derive(Error, Debug)]
14pub enum ImportError {
15 #[error("CAR parsing error: {0}")]
16 CarParse(String),
17 #[error("Expected exactly one root in CAR file")]
18 InvalidRootCount,
19 #[error("Block not found: {0}")]
20 BlockNotFound(String),
21 #[error("Invalid CBOR: {0}")]
22 InvalidCbor(String),
23 #[error("Database error: {0}")]
24 Database(#[from] sqlx::Error),
25 #[error("Block store error: {0}")]
26 BlockStore(String),
27 #[error("Import size limit exceeded")]
28 SizeLimitExceeded,
29 #[error("Repo not found")]
30 RepoNotFound,
31 #[error("Concurrent modification detected")]
32 ConcurrentModification,
33 #[error("Invalid commit structure: {0}")]
34 InvalidCommit(String),
35 #[error("Verification failed: {0}")]
36 VerificationFailed(#[from] super::verify::VerifyError),
37 #[error("DID mismatch: CAR is for {car_did}, but authenticated as {auth_did}")]
38 DidMismatch { car_did: String, auth_did: String },
39}
40
41#[derive(Debug, Clone)]
42pub struct BlobRef {
43 pub cid: String,
44 pub mime_type: Option<String>,
45}
46
47pub async fn parse_car(data: &[u8]) -> Result<(Cid, HashMap<Cid, Bytes>), ImportError> {
48 let cursor = Cursor::new(data);
49 let mut reader = CarReader::new(cursor)
50 .await
51 .map_err(|e| ImportError::CarParse(e.to_string()))?;
52 let header = reader.header();
53 let roots = header.roots();
54 if roots.len() != 1 {
55 return Err(ImportError::InvalidRootCount);
56 }
57 let root = roots[0];
58 let mut blocks = HashMap::new();
59 while let Ok(Some((cid, block))) = reader.next_block().await {
60 blocks.insert(cid, Bytes::from(block));
61 }
62 if !blocks.contains_key(&root) {
63 return Err(ImportError::BlockNotFound(root.to_string()));
64 }
65 Ok((root, blocks))
66}
67
68pub fn find_blob_refs_ipld(value: &Ipld, depth: usize) -> Vec<BlobRef> {
69 if depth > 32 {
70 return vec![];
71 }
72 match value {
73 Ipld::List(arr) => arr
74 .iter()
75 .flat_map(|v| find_blob_refs_ipld(v, depth + 1))
76 .collect(),
77 Ipld::Map(obj) => {
78 if let Some(Ipld::String(type_str)) = obj.get("$type")
79 && type_str == "blob"
80 {
81 let cid_str = if let Some(Ipld::Link(link_cid)) = obj.get("ref") {
82 Some(link_cid.to_string())
83 } else if let Some(Ipld::Map(ref_obj)) = obj.get("ref")
84 && let Some(Ipld::String(link)) = ref_obj.get("$link")
85 {
86 Some(link.clone())
87 } else {
88 None
89 };
90
91 if let Some(cid) = cid_str {
92 let mime = obj.get("mimeType").and_then(|v| {
93 if let Ipld::String(s) = v {
94 Some(s.clone())
95 } else {
96 None
97 }
98 });
99 return vec![BlobRef {
100 cid,
101 mime_type: mime,
102 }];
103 }
104 }
105 obj.values()
106 .flat_map(|v| find_blob_refs_ipld(v, depth + 1))
107 .collect()
108 }
109 _ => vec![],
110 }
111}
112
113pub fn find_blob_refs(value: &JsonValue, depth: usize) -> Vec<BlobRef> {
114 if depth > 32 {
115 return vec![];
116 }
117 match value {
118 JsonValue::Array(arr) => arr
119 .iter()
120 .flat_map(|v| find_blob_refs(v, depth + 1))
121 .collect(),
122 JsonValue::Object(obj) => {
123 if let Some(JsonValue::String(type_str)) = obj.get("$type")
124 && type_str == "blob"
125 && let Some(JsonValue::Object(ref_obj)) = obj.get("ref")
126 && let Some(JsonValue::String(link)) = ref_obj.get("$link")
127 {
128 let mime = obj
129 .get("mimeType")
130 .and_then(|v| v.as_str())
131 .map(String::from);
132 return vec![BlobRef {
133 cid: link.clone(),
134 mime_type: mime,
135 }];
136 }
137 obj.values()
138 .flat_map(|v| find_blob_refs(v, depth + 1))
139 .collect()
140 }
141 _ => vec![],
142 }
143}
144
145pub fn extract_links(value: &Ipld, links: &mut Vec<Cid>) {
146 match value {
147 Ipld::Link(cid) => {
148 links.push(*cid);
149 }
150 Ipld::Map(map) => {
151 for v in map.values() {
152 extract_links(v, links);
153 }
154 }
155 Ipld::List(arr) => {
156 for v in arr {
157 extract_links(v, links);
158 }
159 }
160 _ => {}
161 }
162}
163
164#[derive(Debug)]
165pub struct ImportedRecord {
166 pub collection: String,
167 pub rkey: String,
168 pub cid: Cid,
169 pub blob_refs: Vec<BlobRef>,
170}
171
172pub fn walk_mst(
173 blocks: &HashMap<Cid, Bytes>,
174 root_cid: &Cid,
175) -> Result<Vec<ImportedRecord>, ImportError> {
176 let mut records = Vec::new();
177 walk_mst_node(blocks, root_cid, &[], &mut records)?;
178 Ok(records)
179}
180
181fn walk_mst_node(
182 blocks: &HashMap<Cid, Bytes>,
183 cid: &Cid,
184 prev_key: &[u8],
185 records: &mut Vec<ImportedRecord>,
186) -> Result<(), ImportError> {
187 let block = blocks
188 .get(cid)
189 .ok_or_else(|| ImportError::BlockNotFound(cid.to_string()))?;
190 let value: Ipld = serde_ipld_dagcbor::from_slice(block)
191 .map_err(|e| ImportError::InvalidCbor(e.to_string()))?;
192
193 if let Ipld::Map(ref obj) = value {
194 if let Some(Ipld::Link(left_cid)) = obj.get("l") {
195 walk_mst_node(blocks, left_cid, prev_key, records)?;
196 }
197
198 let mut current_key = prev_key.to_vec();
199
200 if let Some(Ipld::List(entries)) = obj.get("e") {
201 for entry in entries {
202 if let Ipld::Map(entry_obj) = entry {
203 let prefix_len = entry_obj
204 .get("p")
205 .and_then(|p| {
206 if let Ipld::Integer(n) = p {
207 Some(*n as usize)
208 } else {
209 None
210 }
211 })
212 .unwrap_or(0);
213
214 let key_suffix = entry_obj.get("k").and_then(|k| {
215 if let Ipld::Bytes(b) = k {
216 Some(b.clone())
217 } else {
218 None
219 }
220 });
221
222 if let Some(suffix) = key_suffix {
223 current_key.truncate(prefix_len);
224 current_key.extend_from_slice(&suffix);
225 }
226
227 if let Some(Ipld::Link(tree_cid)) = entry_obj.get("t") {
228 walk_mst_node(blocks, tree_cid, ¤t_key, records)?;
229 }
230
231 let record_cid = entry_obj.get("v").and_then(|v| {
232 if let Ipld::Link(cid) = v {
233 Some(*cid)
234 } else {
235 None
236 }
237 });
238
239 if let Some(record_cid) = record_cid
240 && let Ok(full_key) = String::from_utf8(current_key.clone())
241 && let Some(record_block) = blocks.get(&record_cid)
242 && let Ok(record_value) =
243 serde_ipld_dagcbor::from_slice::<Ipld>(record_block)
244 {
245 let blob_refs = find_blob_refs_ipld(&record_value, 0);
246 let parts: Vec<&str> = full_key.split('/').collect();
247 if parts.len() >= 2 {
248 let collection = parts[..parts.len() - 1].join("/");
249 let rkey = parts[parts.len() - 1].to_string();
250 records.push(ImportedRecord {
251 collection,
252 rkey,
253 cid: record_cid,
254 blob_refs,
255 });
256 }
257 }
258 }
259 }
260 }
261 }
262 Ok(())
263}
264
265pub struct CommitInfo {
266 pub rev: Option<String>,
267 pub prev: Option<String>,
268}
269
270pub struct ImportResult {
271 pub records: Vec<ImportedRecord>,
272 pub data_cid: Cid,
273}
274
275fn extract_commit_info(commit: &Ipld) -> Result<(Cid, CommitInfo), ImportError> {
276 let obj = match commit {
277 Ipld::Map(m) => m,
278 _ => {
279 return Err(ImportError::InvalidCommit(
280 "Commit must be a map".to_string(),
281 ));
282 }
283 };
284 let data_cid = obj
285 .get("data")
286 .and_then(|d| {
287 if let Ipld::Link(cid) = d {
288 Some(*cid)
289 } else {
290 None
291 }
292 })
293 .ok_or_else(|| ImportError::InvalidCommit("Missing data field".to_string()))?;
294 let rev = obj.get("rev").and_then(|r| {
295 if let Ipld::String(s) = r {
296 Some(s.clone())
297 } else {
298 None
299 }
300 });
301 let prev = obj.get("prev").and_then(|p| {
302 if let Ipld::Link(cid) = p {
303 Some(cid.to_string())
304 } else if let Ipld::Null = p {
305 None
306 } else {
307 None
308 }
309 });
310 Ok((data_cid, CommitInfo { rev, prev }))
311}
312
313pub async fn apply_import(
314 db: &PgPool,
315 user_id: Uuid,
316 root: Cid,
317 blocks: HashMap<Cid, Bytes>,
318 max_blocks: usize,
319) -> Result<ImportResult, ImportError> {
320 if blocks.len() > max_blocks {
321 return Err(ImportError::SizeLimitExceeded);
322 }
323 let root_block = blocks
324 .get(&root)
325 .ok_or_else(|| ImportError::BlockNotFound(root.to_string()))?;
326 let commit: Ipld = serde_ipld_dagcbor::from_slice(root_block)
327 .map_err(|e| ImportError::InvalidCbor(e.to_string()))?;
328 let (data_cid, _commit_info) = extract_commit_info(&commit)?;
329 let records = walk_mst(&blocks, &data_cid)?;
330 debug!(
331 "Importing {} blocks and {} records for user {}",
332 blocks.len(),
333 records.len(),
334 user_id
335 );
336 let mut tx = db.begin().await?;
337 let repo = sqlx::query!(
338 "SELECT repo_root_cid FROM repos WHERE user_id = $1 FOR UPDATE NOWAIT",
339 user_id
340 )
341 .fetch_optional(&mut *tx)
342 .await
343 .map_err(|e| {
344 if let sqlx::Error::Database(ref db_err) = e
345 && db_err.code().as_deref() == Some("55P03")
346 {
347 return ImportError::ConcurrentModification;
348 }
349 ImportError::Database(e)
350 })?;
351 if repo.is_none() {
352 return Err(ImportError::RepoNotFound);
353 }
354 let block_chunks: Vec<Vec<(&Cid, &Bytes)>> = blocks
355 .iter()
356 .collect::<Vec<_>>()
357 .chunks(100)
358 .map(|c| c.to_vec())
359 .collect();
360 for chunk in block_chunks {
361 for (cid, data) in chunk {
362 let cid_bytes = cid.to_bytes();
363 sqlx::query!(
364 "INSERT INTO blocks (cid, data) VALUES ($1, $2) ON CONFLICT (cid) DO NOTHING",
365 &cid_bytes,
366 data.as_ref()
367 )
368 .execute(&mut *tx)
369 .await?;
370 }
371 }
372 sqlx::query!("DELETE FROM records WHERE repo_id = $1", user_id)
373 .execute(&mut *tx)
374 .await?;
375 for record in &records {
376 let record_cid_str = record.cid.to_string();
377 sqlx::query!(
378 r#"
379 INSERT INTO records (repo_id, collection, rkey, record_cid)
380 VALUES ($1, $2, $3, $4)
381 ON CONFLICT (repo_id, collection, rkey) DO UPDATE SET record_cid = $4
382 "#,
383 user_id,
384 record.collection,
385 record.rkey,
386 record_cid_str
387 )
388 .execute(&mut *tx)
389 .await?;
390 }
391 tx.commit().await?;
392 debug!(
393 "Successfully imported {} blocks and {} records",
394 blocks.len(),
395 records.len()
396 );
397 Ok(ImportResult { records, data_cid })
398}
399
400#[cfg(test)]
401mod tests {
402 use super::*;
403
404 #[test]
405 fn test_find_blob_refs() {
406 let record = serde_json::json!({
407 "$type": "app.bsky.feed.post",
408 "text": "Hello world",
409 "embed": {
410 "$type": "app.bsky.embed.images",
411 "images": [
412 {
413 "alt": "Test image",
414 "image": {
415 "$type": "blob",
416 "ref": {
417 "$link": "bafkreihdwdcefgh4dqkjv67uzcmw7ojee6xedzdetojuzjevtenxquvyku"
418 },
419 "mimeType": "image/jpeg",
420 "size": 12345
421 }
422 }
423 ]
424 }
425 });
426 let blob_refs = find_blob_refs(&record, 0);
427 assert_eq!(blob_refs.len(), 1);
428 assert_eq!(
429 blob_refs[0].cid,
430 "bafkreihdwdcefgh4dqkjv67uzcmw7ojee6xedzdetojuzjevtenxquvyku"
431 );
432 assert_eq!(blob_refs[0].mime_type, Some("image/jpeg".to_string()));
433 }
434
435 #[test]
436 fn test_find_blob_refs_no_blobs() {
437 let record = serde_json::json!({
438 "$type": "app.bsky.feed.post",
439 "text": "Hello world"
440 });
441 let blob_refs = find_blob_refs(&record, 0);
442 assert!(blob_refs.is_empty());
443 }
444
445 #[test]
446 fn test_find_blob_refs_depth_limit() {
447 fn deeply_nested(depth: usize) -> JsonValue {
448 if depth == 0 {
449 serde_json::json!({
450 "$type": "blob",
451 "ref": { "$link": "bafkreitest" },
452 "mimeType": "image/png"
453 })
454 } else {
455 serde_json::json!({ "nested": deeply_nested(depth - 1) })
456 }
457 }
458 let deep = deeply_nested(40);
459 let blob_refs = find_blob_refs(&deep, 0);
460 assert!(blob_refs.is_empty());
461 }
462}