PDS software with bells & whistles you didn’t even know you needed. will move this to its own account when ready.
fork

Configure Feed

Select the types of activity you want to include in your feed.

at main 462 lines 14 kB view raw
1use bytes::Bytes; 2use cid::Cid; 3use ipld_core::ipld::Ipld; 4use iroh_car::CarReader; 5use serde_json::Value as JsonValue; 6use sqlx::PgPool; 7use std::collections::HashMap; 8use std::io::Cursor; 9use thiserror::Error; 10use tracing::debug; 11use uuid::Uuid; 12 13#[derive(Error, Debug)] 14pub enum ImportError { 15 #[error("CAR parsing error: {0}")] 16 CarParse(String), 17 #[error("Expected exactly one root in CAR file")] 18 InvalidRootCount, 19 #[error("Block not found: {0}")] 20 BlockNotFound(String), 21 #[error("Invalid CBOR: {0}")] 22 InvalidCbor(String), 23 #[error("Database error: {0}")] 24 Database(#[from] sqlx::Error), 25 #[error("Block store error: {0}")] 26 BlockStore(String), 27 #[error("Import size limit exceeded")] 28 SizeLimitExceeded, 29 #[error("Repo not found")] 30 RepoNotFound, 31 #[error("Concurrent modification detected")] 32 ConcurrentModification, 33 #[error("Invalid commit structure: {0}")] 34 InvalidCommit(String), 35 #[error("Verification failed: {0}")] 36 VerificationFailed(#[from] super::verify::VerifyError), 37 #[error("DID mismatch: CAR is for {car_did}, but authenticated as {auth_did}")] 38 DidMismatch { car_did: String, auth_did: String }, 39} 40 41#[derive(Debug, Clone)] 42pub struct BlobRef { 43 pub cid: String, 44 pub mime_type: Option<String>, 45} 46 47pub async fn parse_car(data: &[u8]) -> Result<(Cid, HashMap<Cid, Bytes>), ImportError> { 48 let cursor = Cursor::new(data); 49 let mut reader = CarReader::new(cursor) 50 .await 51 .map_err(|e| ImportError::CarParse(e.to_string()))?; 52 let header = reader.header(); 53 let roots = header.roots(); 54 if roots.len() != 1 { 55 return Err(ImportError::InvalidRootCount); 56 } 57 let root = roots[0]; 58 let mut blocks = HashMap::new(); 59 while let Ok(Some((cid, block))) = reader.next_block().await { 60 blocks.insert(cid, Bytes::from(block)); 61 } 62 if !blocks.contains_key(&root) { 63 return Err(ImportError::BlockNotFound(root.to_string())); 64 } 65 Ok((root, blocks)) 66} 67 68pub fn find_blob_refs_ipld(value: &Ipld, depth: usize) -> Vec<BlobRef> { 69 if depth > 32 { 70 return vec![]; 71 } 72 match value { 73 Ipld::List(arr) => arr 74 .iter() 75 .flat_map(|v| find_blob_refs_ipld(v, depth + 1)) 76 .collect(), 77 Ipld::Map(obj) => { 78 if let Some(Ipld::String(type_str)) = obj.get("$type") 79 && type_str == "blob" 80 { 81 let cid_str = if let Some(Ipld::Link(link_cid)) = obj.get("ref") { 82 Some(link_cid.to_string()) 83 } else if let Some(Ipld::Map(ref_obj)) = obj.get("ref") 84 && let Some(Ipld::String(link)) = ref_obj.get("$link") 85 { 86 Some(link.clone()) 87 } else { 88 None 89 }; 90 91 if let Some(cid) = cid_str { 92 let mime = obj.get("mimeType").and_then(|v| { 93 if let Ipld::String(s) = v { 94 Some(s.clone()) 95 } else { 96 None 97 } 98 }); 99 return vec![BlobRef { 100 cid, 101 mime_type: mime, 102 }]; 103 } 104 } 105 obj.values() 106 .flat_map(|v| find_blob_refs_ipld(v, depth + 1)) 107 .collect() 108 } 109 _ => vec![], 110 } 111} 112 113pub fn find_blob_refs(value: &JsonValue, depth: usize) -> Vec<BlobRef> { 114 if depth > 32 { 115 return vec![]; 116 } 117 match value { 118 JsonValue::Array(arr) => arr 119 .iter() 120 .flat_map(|v| find_blob_refs(v, depth + 1)) 121 .collect(), 122 JsonValue::Object(obj) => { 123 if let Some(JsonValue::String(type_str)) = obj.get("$type") 124 && type_str == "blob" 125 && let Some(JsonValue::Object(ref_obj)) = obj.get("ref") 126 && let Some(JsonValue::String(link)) = ref_obj.get("$link") 127 { 128 let mime = obj 129 .get("mimeType") 130 .and_then(|v| v.as_str()) 131 .map(String::from); 132 return vec![BlobRef { 133 cid: link.clone(), 134 mime_type: mime, 135 }]; 136 } 137 obj.values() 138 .flat_map(|v| find_blob_refs(v, depth + 1)) 139 .collect() 140 } 141 _ => vec![], 142 } 143} 144 145pub fn extract_links(value: &Ipld, links: &mut Vec<Cid>) { 146 match value { 147 Ipld::Link(cid) => { 148 links.push(*cid); 149 } 150 Ipld::Map(map) => { 151 for v in map.values() { 152 extract_links(v, links); 153 } 154 } 155 Ipld::List(arr) => { 156 for v in arr { 157 extract_links(v, links); 158 } 159 } 160 _ => {} 161 } 162} 163 164#[derive(Debug)] 165pub struct ImportedRecord { 166 pub collection: String, 167 pub rkey: String, 168 pub cid: Cid, 169 pub blob_refs: Vec<BlobRef>, 170} 171 172pub fn walk_mst( 173 blocks: &HashMap<Cid, Bytes>, 174 root_cid: &Cid, 175) -> Result<Vec<ImportedRecord>, ImportError> { 176 let mut records = Vec::new(); 177 walk_mst_node(blocks, root_cid, &[], &mut records)?; 178 Ok(records) 179} 180 181fn walk_mst_node( 182 blocks: &HashMap<Cid, Bytes>, 183 cid: &Cid, 184 prev_key: &[u8], 185 records: &mut Vec<ImportedRecord>, 186) -> Result<(), ImportError> { 187 let block = blocks 188 .get(cid) 189 .ok_or_else(|| ImportError::BlockNotFound(cid.to_string()))?; 190 let value: Ipld = serde_ipld_dagcbor::from_slice(block) 191 .map_err(|e| ImportError::InvalidCbor(e.to_string()))?; 192 193 if let Ipld::Map(ref obj) = value { 194 if let Some(Ipld::Link(left_cid)) = obj.get("l") { 195 walk_mst_node(blocks, left_cid, prev_key, records)?; 196 } 197 198 let mut current_key = prev_key.to_vec(); 199 200 if let Some(Ipld::List(entries)) = obj.get("e") { 201 for entry in entries { 202 if let Ipld::Map(entry_obj) = entry { 203 let prefix_len = entry_obj 204 .get("p") 205 .and_then(|p| { 206 if let Ipld::Integer(n) = p { 207 Some(*n as usize) 208 } else { 209 None 210 } 211 }) 212 .unwrap_or(0); 213 214 let key_suffix = entry_obj.get("k").and_then(|k| { 215 if let Ipld::Bytes(b) = k { 216 Some(b.clone()) 217 } else { 218 None 219 } 220 }); 221 222 if let Some(suffix) = key_suffix { 223 current_key.truncate(prefix_len); 224 current_key.extend_from_slice(&suffix); 225 } 226 227 if let Some(Ipld::Link(tree_cid)) = entry_obj.get("t") { 228 walk_mst_node(blocks, tree_cid, &current_key, records)?; 229 } 230 231 let record_cid = entry_obj.get("v").and_then(|v| { 232 if let Ipld::Link(cid) = v { 233 Some(*cid) 234 } else { 235 None 236 } 237 }); 238 239 if let Some(record_cid) = record_cid 240 && let Ok(full_key) = String::from_utf8(current_key.clone()) 241 && let Some(record_block) = blocks.get(&record_cid) 242 && let Ok(record_value) = 243 serde_ipld_dagcbor::from_slice::<Ipld>(record_block) 244 { 245 let blob_refs = find_blob_refs_ipld(&record_value, 0); 246 let parts: Vec<&str> = full_key.split('/').collect(); 247 if parts.len() >= 2 { 248 let collection = parts[..parts.len() - 1].join("/"); 249 let rkey = parts[parts.len() - 1].to_string(); 250 records.push(ImportedRecord { 251 collection, 252 rkey, 253 cid: record_cid, 254 blob_refs, 255 }); 256 } 257 } 258 } 259 } 260 } 261 } 262 Ok(()) 263} 264 265pub struct CommitInfo { 266 pub rev: Option<String>, 267 pub prev: Option<String>, 268} 269 270pub struct ImportResult { 271 pub records: Vec<ImportedRecord>, 272 pub data_cid: Cid, 273} 274 275fn extract_commit_info(commit: &Ipld) -> Result<(Cid, CommitInfo), ImportError> { 276 let obj = match commit { 277 Ipld::Map(m) => m, 278 _ => { 279 return Err(ImportError::InvalidCommit( 280 "Commit must be a map".to_string(), 281 )); 282 } 283 }; 284 let data_cid = obj 285 .get("data") 286 .and_then(|d| { 287 if let Ipld::Link(cid) = d { 288 Some(*cid) 289 } else { 290 None 291 } 292 }) 293 .ok_or_else(|| ImportError::InvalidCommit("Missing data field".to_string()))?; 294 let rev = obj.get("rev").and_then(|r| { 295 if let Ipld::String(s) = r { 296 Some(s.clone()) 297 } else { 298 None 299 } 300 }); 301 let prev = obj.get("prev").and_then(|p| { 302 if let Ipld::Link(cid) = p { 303 Some(cid.to_string()) 304 } else if let Ipld::Null = p { 305 None 306 } else { 307 None 308 } 309 }); 310 Ok((data_cid, CommitInfo { rev, prev })) 311} 312 313pub async fn apply_import( 314 db: &PgPool, 315 user_id: Uuid, 316 root: Cid, 317 blocks: HashMap<Cid, Bytes>, 318 max_blocks: usize, 319) -> Result<ImportResult, ImportError> { 320 if blocks.len() > max_blocks { 321 return Err(ImportError::SizeLimitExceeded); 322 } 323 let root_block = blocks 324 .get(&root) 325 .ok_or_else(|| ImportError::BlockNotFound(root.to_string()))?; 326 let commit: Ipld = serde_ipld_dagcbor::from_slice(root_block) 327 .map_err(|e| ImportError::InvalidCbor(e.to_string()))?; 328 let (data_cid, _commit_info) = extract_commit_info(&commit)?; 329 let records = walk_mst(&blocks, &data_cid)?; 330 debug!( 331 "Importing {} blocks and {} records for user {}", 332 blocks.len(), 333 records.len(), 334 user_id 335 ); 336 let mut tx = db.begin().await?; 337 let repo = sqlx::query!( 338 "SELECT repo_root_cid FROM repos WHERE user_id = $1 FOR UPDATE NOWAIT", 339 user_id 340 ) 341 .fetch_optional(&mut *tx) 342 .await 343 .map_err(|e| { 344 if let sqlx::Error::Database(ref db_err) = e 345 && db_err.code().as_deref() == Some("55P03") 346 { 347 return ImportError::ConcurrentModification; 348 } 349 ImportError::Database(e) 350 })?; 351 if repo.is_none() { 352 return Err(ImportError::RepoNotFound); 353 } 354 let block_chunks: Vec<Vec<(&Cid, &Bytes)>> = blocks 355 .iter() 356 .collect::<Vec<_>>() 357 .chunks(100) 358 .map(|c| c.to_vec()) 359 .collect(); 360 for chunk in block_chunks { 361 for (cid, data) in chunk { 362 let cid_bytes = cid.to_bytes(); 363 sqlx::query!( 364 "INSERT INTO blocks (cid, data) VALUES ($1, $2) ON CONFLICT (cid) DO NOTHING", 365 &cid_bytes, 366 data.as_ref() 367 ) 368 .execute(&mut *tx) 369 .await?; 370 } 371 } 372 sqlx::query!("DELETE FROM records WHERE repo_id = $1", user_id) 373 .execute(&mut *tx) 374 .await?; 375 for record in &records { 376 let record_cid_str = record.cid.to_string(); 377 sqlx::query!( 378 r#" 379 INSERT INTO records (repo_id, collection, rkey, record_cid) 380 VALUES ($1, $2, $3, $4) 381 ON CONFLICT (repo_id, collection, rkey) DO UPDATE SET record_cid = $4 382 "#, 383 user_id, 384 record.collection, 385 record.rkey, 386 record_cid_str 387 ) 388 .execute(&mut *tx) 389 .await?; 390 } 391 tx.commit().await?; 392 debug!( 393 "Successfully imported {} blocks and {} records", 394 blocks.len(), 395 records.len() 396 ); 397 Ok(ImportResult { records, data_cid }) 398} 399 400#[cfg(test)] 401mod tests { 402 use super::*; 403 404 #[test] 405 fn test_find_blob_refs() { 406 let record = serde_json::json!({ 407 "$type": "app.bsky.feed.post", 408 "text": "Hello world", 409 "embed": { 410 "$type": "app.bsky.embed.images", 411 "images": [ 412 { 413 "alt": "Test image", 414 "image": { 415 "$type": "blob", 416 "ref": { 417 "$link": "bafkreihdwdcefgh4dqkjv67uzcmw7ojee6xedzdetojuzjevtenxquvyku" 418 }, 419 "mimeType": "image/jpeg", 420 "size": 12345 421 } 422 } 423 ] 424 } 425 }); 426 let blob_refs = find_blob_refs(&record, 0); 427 assert_eq!(blob_refs.len(), 1); 428 assert_eq!( 429 blob_refs[0].cid, 430 "bafkreihdwdcefgh4dqkjv67uzcmw7ojee6xedzdetojuzjevtenxquvyku" 431 ); 432 assert_eq!(blob_refs[0].mime_type, Some("image/jpeg".to_string())); 433 } 434 435 #[test] 436 fn test_find_blob_refs_no_blobs() { 437 let record = serde_json::json!({ 438 "$type": "app.bsky.feed.post", 439 "text": "Hello world" 440 }); 441 let blob_refs = find_blob_refs(&record, 0); 442 assert!(blob_refs.is_empty()); 443 } 444 445 #[test] 446 fn test_find_blob_refs_depth_limit() { 447 fn deeply_nested(depth: usize) -> JsonValue { 448 if depth == 0 { 449 serde_json::json!({ 450 "$type": "blob", 451 "ref": { "$link": "bafkreitest" }, 452 "mimeType": "image/png" 453 }) 454 } else { 455 serde_json::json!({ "nested": deeply_nested(depth - 1) }) 456 } 457 } 458 let deep = deeply_nested(40); 459 let blob_refs = find_blob_refs(&deep, 0); 460 assert!(blob_refs.is_empty()); 461 } 462}