+5
-5
src/backfill/load_car.rs
+5
-5
src/backfill/load_car.rs
···
11
11
#[derive(Error, Debug)]
12
12
pub enum Error {
13
13
#[error("Error loading car file: {}", .0)]
14
-
ClientError(#[from] jacquard::error::ClientError),
14
+
Client(#[from] jacquard::error::ClientError),
15
15
#[error("Error loading car file: {}", .0)]
16
-
RepoError(#[from] jacquard_repo::RepoError),
16
+
Repo(#[from] jacquard_repo::RepoError),
17
17
#[error("Missing root block from car file (malformed car file)")]
18
18
MissingRoot,
19
19
}
···
33
33
use std::cmp::Ordering;
34
34
impl PartialOrd<Tid> for Car {
35
35
fn partial_cmp(&self, other: &Tid) -> Option<Ordering> {
36
-
return match self.rev.compare_to(other) {
36
+
match self.rev.compare_to(other) {
37
37
1 => Some(Ordering::Greater),
38
38
0 => Some(Ordering::Equal),
39
39
-1 => Some(Ordering::Less),
40
40
_ => None,
41
-
};
41
+
}
42
42
}
43
43
}
44
44
···
50
50
let res = agent.xrpc(pds).send(&req).await?;
51
51
52
52
let car = res.buffer();
53
-
let car = jacquard_repo::car::parse_car_bytes(&car).await?;
53
+
let car = jacquard_repo::car::parse_car_bytes(car).await?;
54
54
55
55
let storage = jacquard_repo::storage::MemoryBlockStore::new_from_blocks(car.blocks);
56
56
+17
-23
src/backfill/mod.rs
+17
-23
src/backfill/mod.rs
···
27
27
#[error("Error parsing TID: {}", .0)]
28
28
TidParse(#[from] jacquard::types::string::AtStrError),
29
29
#[error("{}", .0)]
30
-
GetCarError(#[from] crate::backfill::load_car::Error),
30
+
GetCar(#[from] crate::backfill::load_car::Error),
31
31
#[error(
32
32
"The database claims to be more up to date than the PDS.
33
33
Most likely either the PDS or repo is broken, or the database has been corrupted.
···
35
35
)]
36
36
DbTidTooLow,
37
37
#[error("Database error: {}", .0)]
38
-
DbError(#[from] sqlx::Error),
38
+
Db(#[from] sqlx::Error),
39
39
#[error("{}", .0)]
40
-
ParseCarError(#[from] crate::backfill::parse_car::Error),
40
+
ParseCar(#[from] crate::backfill::parse_car::Error),
41
41
}
42
42
43
43
pub async fn backfill(
···
92
92
let _ = query!("DELETE FROM records").execute(conn).await?;
93
93
94
94
let data = parse_car(&car).await?;
95
-
let mut data = data.chunks(DB_MAX_REQ / 4);
95
+
let data = data.chunks(DB_MAX_REQ / 4);
96
96
97
97
if let Some(time) = time {
98
98
println!("Parsed car file ({:?})", time.elapsed());
99
99
}
100
100
let time = time.map(|_| std::time::Instant::now());
101
101
102
-
while let Some(data) = data.next() {
102
+
for data in data {
103
103
let mut query = sqlx::QueryBuilder::new("INSERT INTO records(collection, rkey, record) ");
104
104
query.push_values(
105
105
data,
···
110
110
},
111
111
);
112
112
113
-
match query.build().execute(conn).await {
114
-
Err(err) => {
115
-
// couldnt backfill so go nuclear
116
-
// this is program startup so its prolly safe lol
117
-
println!("Got error \"{}\"\nDeleting records and exiting...", err);
118
-
let _ = query!("DELETE FROM records").execute(conn).await?;
119
-
panic!()
120
-
}
121
-
_ => {}
113
+
if let Err(err) = query.build().execute(conn).await {
114
+
// couldnt backfill so go nuclear
115
+
// this is program startup so its prolly safe lol
116
+
println!("Got error \"{}\"\nDeleting records and exiting...", err);
117
+
let _ = query!("DELETE FROM records").execute(conn).await?;
118
+
panic!()
122
119
};
123
120
}
124
121
···
126
123
println!("Saved to database ({:?})", time.elapsed());
127
124
}
128
125
129
-
match query!(
126
+
if let Err(err) = query!(
130
127
"UPDATE meta SET rev = $1 WHERE did = $2",
131
128
car.rev.to_string(),
132
129
config::USER.to_string()
···
134
131
.execute(conn)
135
132
.await
136
133
{
137
-
Err(err) => {
138
-
// couldnt save tid so go nuclear
139
-
// this is program startup so its prolly safe lol
140
-
println!("Got error \"{}\"\nDeleting records and exiting...", err);
141
-
let _ = query!("DELETE FROM records").execute(conn).await?;
142
-
panic!()
143
-
}
144
-
_ => {}
134
+
// couldnt save tid so go nuclear
135
+
// this is program startup so its prolly safe lol
136
+
println!("Got error \"{}\"\nDeleting records and exiting...", err);
137
+
let _ = query!("DELETE FROM records").execute(conn).await?;
138
+
panic!()
145
139
};
146
140
147
141
Ok(())
+3
-3
src/backfill/parse_car.rs
+3
-3
src/backfill/parse_car.rs
···
11
11
#[derive(Debug, Error)]
12
12
pub enum Error {
13
13
#[error("Error getting records from car file: {}", .0)]
14
-
RepoError(#[from] jacquard_repo::RepoError),
14
+
Repo(#[from] jacquard_repo::RepoError),
15
15
#[error("Missing CID from car file")]
16
16
MissingCid,
17
17
#[error("Could not decode record: {}", .0)]
18
-
DecodeError(#[from] serde_ipld_dagcbor::DecodeError<std::convert::Infallible>),
18
+
Decode(#[from] serde_ipld_dagcbor::DecodeError<std::convert::Infallible>),
19
19
#[error("Could not convert into json: {}", .0)]
20
-
IpldToJsonError(#[from] crate::utils::ipld_json::Error),
20
+
IpldToJson(#[from] crate::utils::ipld_json::Error),
21
21
#[error("Could not break {} into a collection and rkey", .0)]
22
22
MalformedRecordKey(SmolStr),
23
23
}
+8
-8
src/utils/ipld_json.rs
+8
-8
src/utils/ipld_json.rs
···
17
17
#[derive(Error, Debug)]
18
18
pub enum Error {
19
19
#[error("CID error: {}", .0)]
20
-
CidError(#[from] ipld_core::cid::Error),
20
+
Cid(#[from] ipld_core::cid::Error),
21
21
#[error("Number too big: {0} > {1} || {0} < {2}", .val, u64::MAX, i64::MIN)]
22
22
IntInvalidSize { val: i128 },
23
23
#[error("{} was NaN or Infinity", .0)]
···
38
38
Ok(match data {
39
39
Ipld::Null => Value::Null,
40
40
Ipld::Bool(bool) => Value::Bool(*bool),
41
-
Ipld::Integer(int) => Value::Number(
42
-
Number::from_i128(*int).ok_or_else(|| Error::IntInvalidSize { val: *int })?,
43
-
),
41
+
Ipld::Integer(int) => {
42
+
Value::Number(Number::from_i128(*int).ok_or(Error::IntInvalidSize { val: *int })?)
43
+
}
44
44
Ipld::Float(float) => {
45
45
warn!("Got float in IPLD data: {}", float);
46
-
Value::Number(Number::from_f64(*float).ok_or_else(|| Error::FloatInvalidSize(*float))?)
46
+
Value::Number(Number::from_f64(*float).ok_or(Error::FloatInvalidSize(*float))?)
47
47
}
48
48
Ipld::String(str) => Value::String(str.clone()),
49
49
Ipld::Bytes(items) => json!({ "$bytes": BASE64_STANDARD_NO_PAD.encode(items) }),
50
50
Ipld::List(iplds) => Value::Array(
51
51
iplds
52
-
.into_iter()
53
-
.map(|x| ipld_to_json_value(x))
52
+
.iter()
53
+
.map(ipld_to_json_value)
54
54
.collect::<Result<Vec<_>, _>>()?,
55
55
),
56
56
Ipld::Map(map) => Value::Object(
57
-
map.into_iter()
57
+
map.iter()
58
58
.map(|(k, v)| Ok::<_, Error>((k.clone(), ipld_to_json_value(v)?)))
59
59
.collect::<Result<Map<String, Value>, _>>()?,
60
60
),
+10
-4
src/utils/resolver.rs
+10
-4
src/utils/resolver.rs
···
5
5
use thiserror::Error;
6
6
7
7
#[derive(Debug, Error)]
8
-
pub enum ResolveError {
8
+
pub enum Error {
9
9
#[error("Identity error: {}", .0)]
10
-
IdentityError(#[from] jacquard::identity::resolver::IdentityError),
10
+
IdentityError(Box<jacquard::identity::resolver::IdentityError>),
11
11
#[error("Missing domain")]
12
12
MissingDomain,
13
13
}
14
14
15
-
pub async fn resolve(did: &Did<'_>) -> Result<String, ResolveError> {
15
+
impl From<jacquard::identity::resolver::IdentityError> for Error {
16
+
fn from(value: jacquard::identity::resolver::IdentityError) -> Self {
17
+
Self::IdentityError(Box::new(value))
18
+
}
19
+
}
20
+
21
+
pub async fn resolve(did: &Did<'_>) -> Result<String, Error> {
16
22
// resolve did to pds
17
23
let resolver = jacquard::identity::PublicResolver::default();
18
24
let pds = resolver.pds_for_did(did).await?;
19
25
let Some(pds) = pds.domain() else {
20
-
return Err(ResolveError::MissingDomain);
26
+
return Err(Error::MissingDomain);
21
27
};
22
28
Ok(String::from(pds))
23
29
}