+5
build.rs
+5
build.rs
+14
migrations/0001_init.sql
+14
migrations/0001_init.sql
···
1
+
-- Add migration script here
2
+
CREATE TABLE IF NOT EXISTS records (
3
+
collection TEXT,
4
+
rkey TEXT,
5
+
record JSON NOT NULL,
6
+
PRIMARY KEY (collection, rkey)
7
+
);
8
+
9
+
CREATE TABLE IF NOT EXISTS blobs (
10
+
did TEXT,
11
+
cid TEXT,
12
+
blob bytea NOT NULL,
13
+
PRIMARY KEY (did, cid)
14
+
);
+9
-32
src/backfill/load_car.rs
+9
-32
src/backfill/load_car.rs
···
1
1
use jacquard::api::com_atproto;
2
2
use jacquard::client::Agent;
3
-
use jacquard::types::{did::Did, tid::Tid};
3
+
use jacquard::types::did::Did;
4
4
use jacquard::url::Url;
5
5
use jacquard::xrpc::XrpcExt;
6
-
use jacquard_repo::commit::Commit;
7
-
use jacquard_repo::{BlockStore, MemoryBlockStore, Mst};
6
+
use jacquard_repo::{BlockStore, MemoryBlockStore, Mst, commit::Commit};
8
7
use std::sync::Arc;
9
8
use thiserror::Error;
10
9
···
14
13
Client(#[from] jacquard::error::ClientError),
15
14
#[error("Error loading car file: {}", .0)]
16
15
Repo(#[from] jacquard_repo::RepoError),
17
-
#[error("Missing root block from car file (malformed car file)")]
16
+
#[error("Missing root from car file (malformed)")]
18
17
MissingRoot,
19
18
}
20
19
21
20
pub struct Car {
22
21
pub storage: MemoryBlockStore,
23
22
pub mst: Mst<MemoryBlockStore>,
24
-
pub rev: Tid,
25
-
}
26
-
27
-
impl PartialEq<Tid> for Car {
28
-
fn eq(&self, other: &Tid) -> bool {
29
-
&self.rev == other
30
-
}
31
-
}
32
-
33
-
use std::cmp::Ordering;
34
-
impl PartialOrd<Tid> for Car {
35
-
fn partial_cmp(&self, other: &Tid) -> Option<Ordering> {
36
-
match self.rev.compare_to(other) {
37
-
1 => Some(Ordering::Greater),
38
-
0 => Some(Ordering::Equal),
39
-
-1 => Some(Ordering::Less),
40
-
_ => None,
41
-
}
42
-
}
43
23
}
44
24
45
25
pub async fn load_car(user: Did<'_>, pds: Url) -> Result<Car, Error> {
···
54
34
55
35
let storage = jacquard_repo::storage::MemoryBlockStore::new_from_blocks(car.blocks);
56
36
57
-
let root = storage
58
-
.get(&car.root)
59
-
.await?
60
-
.ok_or_else(|| Error::MissingRoot)?;
61
-
let root = Commit::from_cbor(&root)?;
62
-
let rev = root.rev().to_owned();
63
-
let root = root.data();
37
+
let Some(root) = storage.get(&car.root).await? else {
38
+
return Err(Error::MissingRoot);
39
+
};
40
+
let root = Commit::from_cbor(&root)?.data;
64
41
65
-
let mst = Mst::load(Arc::new(storage.clone()), *root, None);
42
+
let mst = Mst::load(Arc::new(storage.clone()), root, None);
66
43
67
-
Ok(Car { storage, mst, rev })
44
+
Ok(Car { storage, mst })
68
45
}
+2
-56
src/backfill/mod.rs
+2
-56
src/backfill/mod.rs
···
6
6
//! 4. convert cbor data to json
7
7
//! 5. store in db (limit to DB_MAX_REQ / 4 to avoid err)
8
8
9
-
use std::{cmp::Ordering, str::FromStr};
9
+
use std::str::FromStr;
10
10
11
-
use jacquard::{types::tid::Tid, url::Url};
11
+
use jacquard::url::Url;
12
12
use sqlx::{Pool, Postgres, query};
13
13
use thiserror::Error;
14
14
···
28
28
TidParse(#[from] jacquard::types::string::AtStrError),
29
29
#[error("{}", .0)]
30
30
GetCar(#[from] crate::backfill::load_car::Error),
31
-
#[error(
32
-
"The database claims to be more up to date than the PDS.
33
-
Most likely either the PDS or repo is broken, or the database has been corrupted.
34
-
Check your PDS repo is working and/or drop the database."
35
-
)]
36
-
DbTidTooLow,
37
31
#[error("Database error: {}", .0)]
38
32
Db(#[from] sqlx::Error),
39
33
#[error("{}", .0)]
···
45
39
conn: &Pool<Postgres>,
46
40
time: Option<std::time::Instant>,
47
41
) -> Result<(), Error> {
48
-
let db_rev = if let Some(rev) = query!(
49
-
"SELECT (rev) FROM meta WHERE did = $1",
50
-
config::USER.to_string()
51
-
)
52
-
.fetch_one(conn)
53
-
.await
54
-
.ok()
55
-
.and_then(|x| x.rev)
56
-
{
57
-
Tid::from_str(&rev)?
58
-
} else {
59
-
Tid::from_time(0, 0)
60
-
};
61
-
62
42
let pds = Url::from_str(&format!("https://{pds}/")).unwrap();
63
43
let car = load_car(config::USER.clone(), pds).await?;
64
44
···
67
47
}
68
48
let time = time.map(|_| std::time::Instant::now());
69
49
70
-
if let Some(val) = car.partial_cmp(&db_rev) {
71
-
match val {
72
-
// car rev newer than db rev
73
-
// continue on; every other branch diverges
74
-
Ordering::Greater => {}
75
-
// revisions are the same so we can skip backfill
76
-
Ordering::Equal => return Ok(()),
77
-
// db rev newer than car rev
78
-
// this means the db or car file is borked
79
-
// panic out and let the user deal with things
80
-
Ordering::Less => return Err(Error::DbTidTooLow),
81
-
// panic!(
82
-
// r"The database claims to be more up to date than the PDS.
83
-
// Most likely either the PDS or repo is broken, or the database has been corrupted.
84
-
// Check your PDS repo is working and/or drop the database."
85
-
// ),
86
-
};
87
-
};
88
-
89
50
// erase all old records and return if it fails
90
51
// we dont use diffs bc theyre complex and the overhead is minimal rn
91
52
// only real overhead is network latency which would be ~= anyway
···
122
83
if let Some(time) = time {
123
84
println!("Saved to database ({:?})", time.elapsed());
124
85
}
125
-
126
-
if let Err(err) = query!(
127
-
"UPDATE meta SET rev = $1 WHERE did = $2",
128
-
car.rev.to_string(),
129
-
config::USER.to_string()
130
-
)
131
-
.execute(conn)
132
-
.await
133
-
{
134
-
// couldnt save tid so go nuclear
135
-
// this is program startup so its prolly safe lol
136
-
println!("Got error \"{}\"\nDeleting records and exiting...", err);
137
-
let _ = query!("DELETE FROM records").execute(conn).await?;
138
-
panic!()
139
-
};
140
86
141
87
Ok(())
142
88
}
+6
-48
src/db.rs
+6
-48
src/db.rs
···
1
1
//! create a connection pool and setup tables before making avaliable
2
2
3
3
use crate::config;
4
-
use sqlx::{Pool, Postgres, postgres::PgPool, query};
4
+
use sqlx::{Pool, Postgres, migrate, postgres::PgPool};
5
5
6
-
pub async fn init() -> Pool<Postgres> {
7
-
let conn = PgPool::connect(&config::POSTGRES_URL).await;
8
-
let conn = match conn {
6
+
pub async fn conn() -> Pool<Postgres> {
7
+
let conn = match PgPool::connect(&config::POSTGRES_URL).await {
9
8
Ok(val) => val,
10
9
Err(err) => {
11
10
println!("Could not connect to the database. Got error {err}");
···
13
12
}
14
13
};
15
14
16
-
// initialise db tables
17
-
if let Err(err) = query!(
18
-
"CREATE TABLE IF NOT EXISTS records (
19
-
collection TEXT,
20
-
rkey TEXT,
21
-
record JSON NOT NULL,
22
-
PRIMARY KEY (collection, rkey)
23
-
);"
24
-
)
25
-
.execute(&conn)
26
-
.await
27
-
{
28
-
println!("Creating table `records`: \n{err}");
29
-
panic!("Could not instantiate db");
30
-
};
31
-
32
-
if let Err(err) = query!(
33
-
"CREATE TABLE IF NOT EXISTS blobs (
34
-
did TEXT,
35
-
cid TEXT,
36
-
blob bytea NOT NULL,
37
-
PRIMARY KEY (did, cid)
38
-
)"
39
-
)
40
-
.execute(&conn)
41
-
.await
42
-
{
43
-
println!("Creating table `blobs`: \n{err}");
44
-
panic!();
45
-
};
46
-
47
-
if let Err(err) = query!(
48
-
"CREATE TABLE IF NOT EXISTS meta (
49
-
did TEXT,
50
-
rev TEXT,
51
-
PRIMARY KEY (did)
52
-
);"
53
-
)
54
-
.execute(&conn)
55
-
.await
56
-
{
57
-
println!("Creating table `meta`: \n{err}");
58
-
panic!();
59
-
};
15
+
migrate!().run(&conn).await.unwrap_or_else(|err| {
16
+
panic!("Error running migrations: {}", err);
17
+
});
60
18
61
19
conn
62
20
}
+1
-1
src/main.rs
+1
-1
src/main.rs
···
14
14
async fn main() -> Result<(), Error> {
15
15
env_logger::init();
16
16
println!("User: {}", *config::USER);
17
-
let conn: Pool<Postgres> = db::init().await;
17
+
let conn: Pool<Postgres> = db::conn().await;
18
18
println!("Database connected and initialized");
19
19
20
20
let pds = match utils::resolver::resolve(&config::USER).await {