+7
migrations/0002_cid-for-records.sql
+7
migrations/0002_cid-for-records.sql
+23
-9
src/backfill/mod.rs
+23
-9
src/backfill/mod.rs
···
8
8
9
9
use std::str::FromStr;
10
10
11
+
use ipld_core::cid::multibase::Base;
11
12
use jacquard::url::Url;
12
13
use sqlx::{Pool, Postgres, query};
13
14
use thiserror::Error;
···
20
21
pub mod load_car;
21
22
pub mod parse_car;
22
23
23
-
const DB_MAX_REQ: usize = 65535;
24
-
25
24
#[derive(Error, Debug)]
26
25
pub enum Error {
27
26
#[error("Error parsing TID: {}", .0)]
···
32
31
Db(#[from] sqlx::Error),
33
32
#[error("{}", .0)]
34
33
ParseCar(#[from] crate::backfill::parse_car::Error),
34
+
#[error("Error processing cid: {}", .0)]
35
+
Cid(#[from] ipld_core::cid::Error),
35
36
}
36
37
37
38
pub async fn backfill(
···
52
53
// only real overhead is network latency which would be ~= anyway
53
54
let _ = query!("DELETE FROM records").execute(conn).await?;
54
55
55
-
let data = parse_car(&car).await?;
56
-
let data = data.chunks(DB_MAX_REQ / 4);
56
+
let data = parse_car(&car)
57
+
.await?
58
+
.into_iter()
59
+
.map(|(collection, rkey, cid, value)| {
60
+
Ok::<_, Error>((
61
+
collection,
62
+
rkey,
63
+
cid.to_string_of_base(Base::Base32Lower)?,
64
+
value,
65
+
))
66
+
})
67
+
.collect::<Result<Vec<_>, _>>()?;
68
+
let data = data.chunks(config::DB_MAX_REQ / 4);
57
69
58
70
if let Some(time) = time {
59
71
println!("Parsed car file ({:?})", time.elapsed());
···
61
73
let time = time.map(|_| std::time::Instant::now());
62
74
63
75
for data in data {
64
-
let mut query = sqlx::QueryBuilder::new("INSERT INTO records(collection, rkey, record) ");
76
+
let mut query =
77
+
sqlx::QueryBuilder::new("INSERT INTO records(collection, rkey, cid, record) ");
65
78
query.push_values(
66
-
data,
79
+
data.to_owned(),
67
80
|mut b: sqlx::query_builder::Separated<'_, '_, Postgres, &'static str>, data| {
68
-
b.push_bind(data.0.0.clone())
69
-
.push_bind(data.0.1.clone())
70
-
.push_bind(data.1.clone());
81
+
b.push_bind(data.0)
82
+
.push_bind(data.1)
83
+
.push_bind(data.2)
84
+
.push_bind(data.3);
71
85
},
72
86
);
73
87
+15
-10
src/backfill/parse_car.rs
+15
-10
src/backfill/parse_car.rs
···
20
20
IpldToJson(#[from] crate::utils::ipld_json::Error),
21
21
#[error("Could not break {} into a collection and rkey", .0)]
22
22
MalformedRecordKey(SmolStr),
23
+
#[error("Could not generate cid for commit: {}", .0)]
24
+
Cid(#[from] ipld_core::cid::Error),
23
25
}
24
26
25
-
pub type AccountData = Vec<((String, String), Value)>;
27
+
pub type AccountData = Vec<(String, String, CidGeneric<64>, Value)>;
26
28
27
29
pub async fn parse_car(car: &Car) -> Result<AccountData, Error> {
28
-
let (keys, records): (Vec<SmolStr>, Vec<CidGeneric<64>>) =
30
+
let (keys, record_cids): (Vec<SmolStr>, Vec<CidGeneric<64>>) =
29
31
car.mst.leaves().await?.into_iter().unzip();
30
32
31
33
// convert keys into (collection, rkey)
···
47
49
.collect::<Result<Vec<_>, _>>()?;
48
50
49
51
// convert records into Value
50
-
let records = &records[..];
52
+
let record_cids = &record_cids[..];
51
53
let records = car
52
54
.storage
53
-
.get_many(records)
55
+
.get_many(record_cids)
54
56
.await?
55
57
.into_iter()
56
58
.collect::<Option<Vec<_>>>()
57
-
.ok_or_else(|| Error::MissingCid)?
58
-
.into_iter()
59
-
.map(|x| {
60
-
let data = serde_ipld_dagcbor::from_slice::<Ipld>(&x)?;
59
+
.ok_or_else(|| Error::MissingCid)?;
60
+
61
+
let records = zip(records, record_cids)
62
+
.map(|(bytes, cid)| {
63
+
let data = serde_ipld_dagcbor::from_slice::<Ipld>(&bytes)?;
61
64
let value = ipld_to_json_value(&data)?;
62
-
Ok::<_, Error>(value)
65
+
Ok::<(CidGeneric<64>, Value), Error>((cid.to_owned(), value))
63
66
})
64
67
.collect::<Result<Vec<_>, _>>()?;
65
68
66
-
let data = zip(keys, records).collect::<Vec<((_, _), _)>>();
69
+
let data = zip(keys, records)
70
+
.map(|((collection, rkey), (cid, record))| (collection, rkey, cid, record))
71
+
.collect();
67
72
68
73
Ok(data)
69
74
}
+2
src/config.rs
+2
src/config.rs
···
7
7
use std::env;
8
8
use std::sync::LazyLock;
9
9
10
+
pub const DB_MAX_REQ: usize = 65535;
11
+
10
12
// this should be loaded before the program starts any threads
11
13
// if this panics threads that access it will be poisoned
12
14
pub static USER: LazyLock<Did<'static>> = LazyLock::new(|| {