Constellation, Spacedust, Slingshot, UFOs: atproto crates and services for microcosm

import cars: extract links. also fmt

Changed files
+235 -79
links
spacedust
+60 -9
Cargo.lock
··· 129 checksum = "69f7f8c3906b62b754cd5326047894316021dcfe5a194c8ea52bdd94934a3457" 130 131 [[package]] 132 name = "arrayvec" 133 version = "0.7.6" 134 source = "registry+https://github.com/rust-lang/crates.io-index" ··· 822 checksum = "5c8214115b7bf84099f1309324e63141d4c5d7cc26862f97a0a857dbefe165bd" 823 824 [[package]] 825 name = "block-buffer" 826 version = "0.10.4" 827 source = "registry+https://github.com/rust-lang/crates.io-index" ··· 923 ] 924 925 [[package]] 926 name = "cc" 927 version = "1.2.18" 928 source = "registry+https://github.com/rust-lang/crates.io-index" ··· 1122 version = "0.4.3" 1123 source = "registry+https://github.com/rust-lang/crates.io-index" 1124 checksum = "2f421161cb492475f1661ddc9815a745a1c894592070661180fdec3d4872e9c3" 1125 1126 [[package]] 1127 name = "constellation" ··· 1390 ] 1391 1392 [[package]] 1393 name = "data-encoding" 1394 - version = "2.8.0" 1395 source = "registry+https://github.com/rust-lang/crates.io-index" 1396 - checksum = "575f75dfd25738df5b91b8e43e14d44bda14637a58fae779fd2b064f8bf3e010" 1397 1398 [[package]] 1399 name = "data-encoding-macro" 1400 - version = "0.1.17" 1401 source = "registry+https://github.com/rust-lang/crates.io-index" 1402 - checksum = "9f9724adfcf41f45bf652b3995837669d73c4d49a1b5ac1ff82905ac7d9b5558" 1403 dependencies = [ 1404 "data-encoding", 1405 "data-encoding-macro-internal", ··· 1407 1408 [[package]] 1409 name = "data-encoding-macro-internal" 1410 - version = "0.1.15" 1411 source = "registry+https://github.com/rust-lang/crates.io-index" 1412 - checksum = "18e4fdb82bd54a12e42fb58a800dcae6b9e13982238ce2296dc3570b92148e1f" 1413 dependencies = [ 1414 "data-encoding", 1415 "syn 2.0.106", ··· 3185 version = "0.1.0" 3186 dependencies = [ 3187 "anyhow", 3188 "fluent-uri", 3189 "nom", 3190 "thiserror 2.0.17", 3191 "tinyjson", 3192 ] ··· 4642 4643 [[package]] 4644 name = "repo-stream" 4645 - version = "0.2.1" 4646 source = "registry+https://github.com/rust-lang/crates.io-index" 4647 - checksum = "727a78c392bd51b1af938e4383f2f6f46ae727cb38394136d1aebab0633faf8e" 4648 dependencies = [ 4649 "bincode 2.0.1", 4650 "futures", ··· 5168 source = "registry+https://github.com/rust-lang/crates.io-index" 5169 checksum = "46182f4f08349a02b45c998ba3215d3f9de826246ba02bb9dddfe9a2a2100778" 5170 dependencies = [ 5171 - "cbor4ii", 5172 "ipld-core", 5173 "scopeguard", 5174 "serde", ··· 5510 "async-trait", 5511 "clap", 5512 "ctrlc", 5513 "dropshot", 5514 "env_logger", 5515 "fjall 3.0.0-pre.0",
··· 129 checksum = "69f7f8c3906b62b754cd5326047894316021dcfe5a194c8ea52bdd94934a3457" 130 131 [[package]] 132 + name = "arrayref" 133 + version = "0.3.9" 134 + source = "registry+https://github.com/rust-lang/crates.io-index" 135 + checksum = "76a2e8124351fda1ef8aaaa3bbd7ebbcb486bbcd4225aca0aa0d84bb2db8fecb" 136 + 137 + [[package]] 138 name = "arrayvec" 139 version = "0.7.6" 140 source = "registry+https://github.com/rust-lang/crates.io-index" ··· 828 checksum = "5c8214115b7bf84099f1309324e63141d4c5d7cc26862f97a0a857dbefe165bd" 829 830 [[package]] 831 + name = "blake3" 832 + version = "1.8.2" 833 + source = "registry+https://github.com/rust-lang/crates.io-index" 834 + checksum = "3888aaa89e4b2a40fca9848e400f6a658a5a3978de7be858e209cafa8be9a4a0" 835 + dependencies = [ 836 + "arrayref", 837 + "arrayvec", 838 + "cc", 839 + "cfg-if", 840 + "constant_time_eq", 841 + ] 842 + 843 + [[package]] 844 name = "block-buffer" 845 version = "0.10.4" 846 source = "registry+https://github.com/rust-lang/crates.io-index" ··· 942 ] 943 944 [[package]] 945 + name = "cbor4ii" 946 + version = "1.2.0" 947 + source = "registry+https://github.com/rust-lang/crates.io-index" 948 + checksum = "b28d2802395e3bccd95cc4ae984bff7444b6c1f5981da46a41360c42a2c7e2d9" 949 + 950 + [[package]] 951 name = "cc" 952 version = "1.2.18" 953 source = "registry+https://github.com/rust-lang/crates.io-index" ··· 1147 version = "0.4.3" 1148 source = "registry+https://github.com/rust-lang/crates.io-index" 1149 checksum = "2f421161cb492475f1661ddc9815a745a1c894592070661180fdec3d4872e9c3" 1150 + 1151 + [[package]] 1152 + name = "constant_time_eq" 1153 + version = "0.3.1" 1154 + source = "registry+https://github.com/rust-lang/crates.io-index" 1155 + checksum = "7c74b8349d32d297c9134b8c88677813a227df8f779daa29bfc29c183fe3dca6" 1156 1157 [[package]] 1158 name = "constellation" ··· 1421 ] 1422 1423 [[package]] 1424 + name = "dasl" 1425 + version = "0.2.0" 1426 + source = "registry+https://github.com/rust-lang/crates.io-index" 1427 + checksum = "b59666035a4386b0fd272bd78da4cbc3ccb558941e97579ab00f0eb4639f2a49" 1428 + dependencies = [ 1429 + "blake3", 1430 + "cbor4ii 1.2.0", 1431 + "data-encoding", 1432 + "data-encoding-macro", 1433 + "scopeguard", 1434 + "serde", 1435 + "serde_bytes", 1436 + "sha2", 1437 + "thiserror 2.0.17", 1438 + ] 1439 + 1440 + [[package]] 1441 name = "data-encoding" 1442 + version = "2.9.0" 1443 source = "registry+https://github.com/rust-lang/crates.io-index" 1444 + checksum = "2a2330da5de22e8a3cb63252ce2abb30116bf5265e89c0e01bc17015ce30a476" 1445 1446 [[package]] 1447 name = "data-encoding-macro" 1448 + version = "0.1.18" 1449 source = "registry+https://github.com/rust-lang/crates.io-index" 1450 + checksum = "47ce6c96ea0102f01122a185683611bd5ac8d99e62bc59dd12e6bda344ee673d" 1451 dependencies = [ 1452 "data-encoding", 1453 "data-encoding-macro-internal", ··· 1455 1456 [[package]] 1457 name = "data-encoding-macro-internal" 1458 + version = "0.1.16" 1459 source = "registry+https://github.com/rust-lang/crates.io-index" 1460 + checksum = "8d162beedaa69905488a8da94f5ac3edb4dd4788b732fadb7bd120b2625c1976" 1461 dependencies = [ 1462 "data-encoding", 1463 "syn 2.0.106", ··· 3233 version = "0.1.0" 3234 dependencies = [ 3235 "anyhow", 3236 + "dasl", 3237 "fluent-uri", 3238 "nom", 3239 + "serde", 3240 "thiserror 2.0.17", 3241 "tinyjson", 3242 ] ··· 4692 4693 [[package]] 4694 name = "repo-stream" 4695 + version = "0.2.2" 4696 source = "registry+https://github.com/rust-lang/crates.io-index" 4697 + checksum = "093b48e604c138949bf3d4a1a9bc1165feb1db28a73af0101c84eb703d279f43" 4698 dependencies = [ 4699 "bincode 2.0.1", 4700 "futures", ··· 5218 source = "registry+https://github.com/rust-lang/crates.io-index" 5219 checksum = "46182f4f08349a02b45c998ba3215d3f9de826246ba02bb9dddfe9a2a2100778" 5220 dependencies = [ 5221 + "cbor4ii 0.2.14", 5222 "ipld-core", 5223 "scopeguard", 5224 "serde", ··· 5560 "async-trait", 5561 "clap", 5562 "ctrlc", 5563 + "dasl", 5564 "dropshot", 5565 "env_logger", 5566 "fjall 3.0.0-pre.0",
+2
links/Cargo.toml
··· 5 6 [dependencies] 7 anyhow = "1.0.95" 8 fluent-uri = "0.3.2" 9 nom = "7.1.3" 10 thiserror = "2.0.9" 11 tinyjson = "2.5.1"
··· 5 6 [dependencies] 7 anyhow = "1.0.95" 8 + dasl = "0.2.0" 9 fluent-uri = "0.3.2" 10 nom = "7.1.3" 11 + serde = { version = "1.0.228", features = ["derive"] } 12 thiserror = "2.0.9" 13 tinyjson = "2.5.1"
+3 -2
links/src/lib.rs
··· 1 use fluent_uri::Uri; 2 3 pub mod at_uri; 4 pub mod did; ··· 6 7 pub use record::collect_links; 8 9 - #[derive(Debug, Clone, Ord, Eq, PartialOrd, PartialEq)] 10 pub enum Link { 11 AtUri(String), 12 Uri(String), ··· 59 } 60 } 61 62 - #[derive(Debug, PartialEq)] 63 pub struct CollectedLink { 64 pub path: String, 65 pub target: Link,
··· 1 use fluent_uri::Uri; 2 + use serde::{Deserialize, Serialize}; 3 4 pub mod at_uri; 5 pub mod did; ··· 7 8 pub use record::collect_links; 9 10 + #[derive(Debug, Clone, Ord, Eq, PartialOrd, PartialEq, Serialize, Deserialize)] 11 pub enum Link { 12 AtUri(String), 13 Uri(String), ··· 60 } 61 } 62 63 + #[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] 64 pub struct CollectedLink { 65 pub path: String, 66 pub target: Link,
+41
links/src/record.rs
··· 1 use tinyjson::JsonValue; 2 3 use crate::{parse_any_link, CollectedLink}; ··· 36 } 37 } 38 39 pub fn collect_links(v: &JsonValue) -> Vec<CollectedLink> { 40 let mut found = vec![]; 41 walk_record("", v, &mut found); 42 found 43 } 44
··· 1 + use dasl::drisl::Value as DrislValue; 2 use tinyjson::JsonValue; 3 4 use crate::{parse_any_link, CollectedLink}; ··· 37 } 38 } 39 40 + pub fn walk_drisl(path: &str, v: &DrislValue, found: &mut Vec<CollectedLink>) { 41 + match v { 42 + DrislValue::Map(o) => { 43 + for (key, child) in o { 44 + walk_drisl(&format!("{path}.{key}"), child, found) 45 + } 46 + } 47 + DrislValue::Array(a) => { 48 + for child in a { 49 + let child_p = match child { 50 + DrislValue::Map(o) => { 51 + if let Some(DrislValue::Text(t)) = o.get("$type") { 52 + format!("{path}[{t}]") 53 + } else { 54 + format!("{path}[]") 55 + } 56 + } 57 + _ => format!("{path}[]"), 58 + }; 59 + walk_drisl(&child_p, child, found) 60 + } 61 + } 62 + DrislValue::Text(s) => { 63 + if let Some(link) = parse_any_link(s) { 64 + found.push(CollectedLink { 65 + path: path.to_string(), 66 + target: link, 67 + }); 68 + } 69 + } 70 + _ => {} 71 + } 72 + } 73 + 74 pub fn collect_links(v: &JsonValue) -> Vec<CollectedLink> { 75 let mut found = vec![]; 76 walk_record("", v, &mut found); 77 + found 78 + } 79 + 80 + pub fn collect_links_drisl(v: &DrislValue) -> Vec<CollectedLink> { 81 + let mut found = vec![]; 82 + walk_drisl("", v, &mut found); 83 found 84 } 85
+2 -1
spacedust/Cargo.toml
··· 9 async-trait = "0.1.88" 10 clap = { version = "4.5.40", features = ["derive"] } 11 ctrlc = "3.4.7" 12 dropshot = "0.16.2" 13 env_logger = "0.11.8" 14 fjall = "3.0.0-pre.0" ··· 21 metrics = "0.24.2" 22 metrics-exporter-prometheus = { version = "0.17.1", features = ["http-listener"] } 23 rand = "0.9.1" 24 - repo-stream = "0.2.1" 25 reqwest = { version = "0.12.24", features = ["json", "stream"] } 26 schemars = "0.8.22" 27 semver = "1.0.26"
··· 9 async-trait = "0.1.88" 10 clap = { version = "4.5.40", features = ["derive"] } 11 ctrlc = "3.4.7" 12 + dasl = "0.2.0" 13 dropshot = "0.16.2" 14 env_logger = "0.11.8" 15 fjall = "3.0.0-pre.0" ··· 22 metrics = "0.24.2" 23 metrics-exporter-prometheus = { version = "0.17.1", features = ["http-listener"] } 24 rand = "0.9.1" 25 + repo-stream = "0.2.2" 26 reqwest = { version = "0.12.24", features = ["json", "stream"] } 27 schemars = "0.8.22" 28 semver = "1.0.26"
+1 -4
spacedust/src/bin/import_car_file.rs
··· 1 use clap::Parser; 2 use std::path::PathBuf; 3 - use spacedust::storage::car; 4 5 type Result<T> = std::result::Result<T, Box<dyn std::error::Error>>; 6 ··· 16 17 let Args { file } = Args::parse(); 18 19 - let reader = tokio::fs::File::open(file).await?; 20 - 21 - car::import(reader).await?; 22 23 Ok(()) 24 }
··· 1 use clap::Parser; 2 use std::path::PathBuf; 3 4 type Result<T> = std::result::Result<T, Box<dyn std::error::Error>>; 5 ··· 15 16 let Args { file } = Args::parse(); 17 18 + let _reader = tokio::fs::File::open(file).await?; 19 20 Ok(()) 21 }
+108 -44
spacedust/src/bin/import_scraped.rs
··· 1 use clap::Parser; 2 - use std::sync::{Arc, atomic::{AtomicUsize, Ordering}}; 3 use std::path::PathBuf; 4 - use tokio::{task::JoinSet, io::AsyncRead}; 5 - use repo_stream::{DriverBuilder, Driver, DiskBuilder, DiskStore, drive::NeedDisk}; 6 - 7 8 type Result<T> = anyhow::Result<T>; //std::result::Result<T, Box<dyn std::error::Error>>; 9 10 11 #[derive(Debug, Parser)] 12 struct Args { ··· 20 disk_folder: PathBuf, 21 } 22 23 - async fn get_cars(cars_folder: PathBuf, tx: async_channel::Sender<tokio::io::BufReader<tokio::fs::File>>) -> Result<()> { 24 let mut dir = tokio::fs::read_dir(cars_folder).await?; 25 while let Some(entry) = dir.next_entry().await? { 26 if !entry.file_type().await?.is_file() { ··· 35 36 async fn drive_mem<R: AsyncRead + Unpin + Send + Sync + 'static>( 37 f: R, 38 - disk_tx: &async_channel::Sender<NeedDisk<R, usize>>, 39 - ) -> Result<Option<usize>> { 40 let mut n = 0; 41 - match DriverBuilder::new() 42 - .with_block_processor(|_| 0_usize) // don't care just counting records 43 - .with_mem_limit_mb(32) 44 - .load_car(f) 45 - .await? 46 - { 47 Driver::Memory(_commit, mut driver) => { 48 while let Some(chunk) = driver.next_chunk(512).await? { 49 - n += chunk.len(); 50 } 51 - Ok(Some(n)) 52 } 53 Driver::Disk(need_disk) => { 54 disk_tx.send(need_disk).await?; ··· 59 60 async fn mem_worker<R: AsyncRead + Unpin + Send + Sync + 'static>( 61 car_rx: async_channel::Receiver<R>, 62 - disk_tx: async_channel::Sender<NeedDisk<R, usize>>, 63 n: Arc<AtomicUsize>, 64 ) -> Result<()> { 65 while let Ok(f) = car_rx.recv().await { 66 - let driven = match drive_mem(f, &disk_tx).await { 67 Ok(d) => d, 68 Err(e) => { 69 eprintln!("failed to drive mem: {e:?}. skipping..."); 70 continue; 71 } 72 }; 73 - if let Some(drove) = driven { 74 n.fetch_add(drove, Ordering::Relaxed); 75 } 76 } 77 Ok(()) 78 } 79 80 async fn drive_disk<R: AsyncRead + Unpin>( 81 - needed: NeedDisk<R, usize>, 82 store: DiskStore, 83 - ) -> Result<(usize, DiskStore)> { 84 let (_commit, mut driver) = needed.finish_loading(store).await?; 85 let mut n = 0; 86 while let Some(chunk) = driver.next_chunk(512).await? { 87 - n += chunk.len(); 88 } 89 let store = driver.reset_store().await?; 90 - Ok((n, store)) 91 } 92 93 async fn disk_worker<R: AsyncRead + Unpin>( 94 worker_id: usize, 95 - disk_rx: async_channel::Receiver<NeedDisk<R, usize>>, 96 folder: PathBuf, 97 n: Arc<AtomicUsize>, 98 disk_workers_active: Arc<AtomicUsize>, 99 ) -> Result<()> { 100 let mut file = folder; 101 file.push(format!("disk-worker-{worker_id}.sqlite")); 102 - let mut store = DiskBuilder::new() 103 - .with_cache_size_mb(128) 104 - .open(file.clone()) 105 - .await?; 106 while let Ok(needed) = disk_rx.recv().await { 107 let active = disk_workers_active.fetch_add(1, Ordering::AcqRel); 108 println!("-> disk workers active: {}", active + 1); 109 - let drove = match drive_disk(needed, store).await { 110 - Ok((d, s)) => { 111 store = s; 112 - d 113 } 114 Err(e) => { 115 eprintln!("failed to drive disk: {e:?}. skipping..."); 116 - store = DiskBuilder::new() 117 - .with_cache_size_mb(128) 118 - .open(file.clone()) 119 - .await?; 120 continue; 121 } 122 }; 123 n.fetch_add(drove, Ordering::Relaxed); 124 let were_active = disk_workers_active.fetch_sub(1, Ordering::AcqRel); 125 println!("<- disk workers active: {}", were_active - 1); 126 } 127 Ok(()) 128 } 129 130 - 131 #[tokio::main] 132 async fn main() -> Result<()> { 133 env_logger::init(); 134 135 - let Args { cars_folder, disk_folder, disk_workers, mem_workers } = Args::parse(); 136 137 let mut set = JoinSet::<Result<()>>::new(); 138 - 139 140 let (cars_tx, cars_rx) = async_channel::bounded(2); 141 set.spawn(get_cars(cars_folder, cars_tx)); 142 143 let n: Arc<AtomicUsize> = Arc::new(0.into()); 144 let disk_workers_active: Arc<AtomicUsize> = Arc::new(0.into()); 145 146 set.spawn({ 147 let n = n.clone(); 148 let mut interval = tokio::time::interval(std::time::Duration::from_secs(10)); 149 async move { 150 let mut last_n = n.load(Ordering::Relaxed); 151 loop { 152 interval.tick().await; 153 let n = n.load(Ordering::Relaxed); 154 - let diff = n - last_n; 155 - println!("rate: {} rec/sec", diff / 10); 156 - if diff == 0 { 157 println!("zero encountered, stopping rate calculation polling."); 158 break Ok(()); 159 } 160 last_n = n; 161 } 162 } 163 }); 164 - 165 166 let (needs_disk_tx, needs_disk_rx) = async_channel::bounded(disk_workers); 167 168 - 169 for _ in 0..mem_workers { 170 - set.spawn(mem_worker(cars_rx.clone(), needs_disk_tx.clone(), n.clone())); 171 } 172 drop(cars_rx); 173 drop(needs_disk_tx); ··· 179 needs_disk_rx.clone(), 180 disk_folder.clone(), 181 n.clone(), 182 disk_workers_active.clone(), 183 )); 184 } ··· 188 println!("task from set joined: {res:?}"); 189 } 190 191 - eprintln!("total records processed: {n:?}"); 192 193 Ok(()) 194 }
··· 1 use clap::Parser; 2 + use links::CollectedLink; 3 + use repo_stream::{ 4 + DiskBuilder, DiskStore, Driver, DriverBuilder, Processable, drive::DriverBuilderWithProcessor, 5 + drive::NeedDisk, 6 + }; 7 use std::path::PathBuf; 8 + use std::sync::{ 9 + Arc, 10 + atomic::{AtomicUsize, Ordering}, 11 + }; 12 + use tokio::{io::AsyncRead, task::JoinSet}; 13 14 type Result<T> = anyhow::Result<T>; //std::result::Result<T, Box<dyn std::error::Error>>; 15 16 + #[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] 17 + struct CollectedProcessed(CollectedLink); 18 + 19 + impl Processable for CollectedProcessed { 20 + fn get_size(&self) -> usize { 21 + self.0.path.capacity() + self.0.target.as_str().len() 22 + } 23 + } 24 + 25 + #[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] 26 + struct ErrString(String); 27 + 28 + impl Processable for ErrString { 29 + fn get_size(&self) -> usize { 30 + self.0.capacity() 31 + } 32 + } 33 + 34 + type Processed = std::result::Result<Vec<CollectedProcessed>, ErrString>; 35 + 36 + /// hacky for now: put errors in strings 🤷‍♀️ 37 + fn process(block: Vec<u8>) -> Processed { 38 + let value: dasl::drisl::Value = dasl::drisl::from_slice(&block) 39 + .map_err(|e| ErrString(format!("failed to parse block with drisl: {e:?}")))?; 40 + let links = links::record::collect_links_drisl(&value) 41 + .into_iter() 42 + .map(CollectedProcessed) 43 + .collect(); 44 + Ok(links) 45 + } 46 47 #[derive(Debug, Parser)] 48 struct Args { ··· 56 disk_folder: PathBuf, 57 } 58 59 + async fn get_cars( 60 + cars_folder: PathBuf, 61 + tx: async_channel::Sender<tokio::io::BufReader<tokio::fs::File>>, 62 + ) -> Result<()> { 63 let mut dir = tokio::fs::read_dir(cars_folder).await?; 64 while let Some(entry) = dir.next_entry().await? { 65 if !entry.file_type().await?.is_file() { ··· 74 75 async fn drive_mem<R: AsyncRead + Unpin + Send + Sync + 'static>( 76 f: R, 77 + builder: &DriverBuilderWithProcessor<Processed>, 78 + disk_tx: &async_channel::Sender<NeedDisk<R, Processed>>, 79 + ) -> Result<Option<(usize, usize)>> { 80 let mut n = 0; 81 + let mut n_records = 0; 82 + match builder.load_car(f).await? { 83 Driver::Memory(_commit, mut driver) => { 84 while let Some(chunk) = driver.next_chunk(512).await? { 85 + n_records += chunk.len(); 86 + for (_key, links) in chunk { 87 + match links { 88 + Ok(links) => n += links.len(), 89 + Err(e) => eprintln!("wat: {e:?}"), 90 + } 91 + } 92 } 93 + Ok(Some((n, n_records))) 94 } 95 Driver::Disk(need_disk) => { 96 disk_tx.send(need_disk).await?; ··· 101 102 async fn mem_worker<R: AsyncRead + Unpin + Send + Sync + 'static>( 103 car_rx: async_channel::Receiver<R>, 104 + disk_tx: async_channel::Sender<NeedDisk<R, Processed>>, 105 n: Arc<AtomicUsize>, 106 + n_records: Arc<AtomicUsize>, 107 ) -> Result<()> { 108 + let builder = DriverBuilder::new() 109 + .with_block_processor(process) // don't care just counting records 110 + .with_mem_limit_mb(128); 111 while let Ok(f) = car_rx.recv().await { 112 + let driven = match drive_mem(f, &builder, &disk_tx).await { 113 Ok(d) => d, 114 Err(e) => { 115 eprintln!("failed to drive mem: {e:?}. skipping..."); 116 continue; 117 } 118 }; 119 + if let Some((drove, recs)) = driven { 120 n.fetch_add(drove, Ordering::Relaxed); 121 + n_records.fetch_add(recs, Ordering::Relaxed); 122 } 123 } 124 Ok(()) 125 } 126 127 async fn drive_disk<R: AsyncRead + Unpin>( 128 + needed: NeedDisk<R, Processed>, 129 store: DiskStore, 130 + ) -> Result<(usize, usize, DiskStore)> { 131 let (_commit, mut driver) = needed.finish_loading(store).await?; 132 let mut n = 0; 133 + let mut n_records = 0; 134 while let Some(chunk) = driver.next_chunk(512).await? { 135 + n_records += chunk.len(); 136 + for (_key, links) in chunk { 137 + match links { 138 + Ok(links) => n += links.len(), 139 + Err(e) => eprintln!("wat: {e:?}"), 140 + } 141 + } 142 } 143 let store = driver.reset_store().await?; 144 + Ok((n, n_records, store)) 145 } 146 147 async fn disk_worker<R: AsyncRead + Unpin>( 148 worker_id: usize, 149 + disk_rx: async_channel::Receiver<NeedDisk<R, Processed>>, 150 folder: PathBuf, 151 n: Arc<AtomicUsize>, 152 + n_records: Arc<AtomicUsize>, 153 disk_workers_active: Arc<AtomicUsize>, 154 ) -> Result<()> { 155 let mut file = folder; 156 file.push(format!("disk-worker-{worker_id}.sqlite")); 157 + let builder = DiskBuilder::new().with_cache_size_mb(128); 158 + let mut store = builder.open(file.clone()).await?; 159 while let Ok(needed) = disk_rx.recv().await { 160 let active = disk_workers_active.fetch_add(1, Ordering::AcqRel); 161 println!("-> disk workers active: {}", active + 1); 162 + let (drove, records) = match drive_disk(needed, store).await { 163 + Ok((d, r, s)) => { 164 store = s; 165 + (d, r) 166 } 167 Err(e) => { 168 eprintln!("failed to drive disk: {e:?}. skipping..."); 169 + store = builder.open(file.clone()).await?; 170 continue; 171 } 172 }; 173 n.fetch_add(drove, Ordering::Relaxed); 174 + n_records.fetch_add(records, Ordering::Relaxed); 175 let were_active = disk_workers_active.fetch_sub(1, Ordering::AcqRel); 176 println!("<- disk workers active: {}", were_active - 1); 177 } 178 Ok(()) 179 } 180 181 #[tokio::main] 182 async fn main() -> Result<()> { 183 env_logger::init(); 184 185 + let Args { 186 + cars_folder, 187 + disk_folder, 188 + disk_workers, 189 + mem_workers, 190 + } = Args::parse(); 191 192 let mut set = JoinSet::<Result<()>>::new(); 193 194 let (cars_tx, cars_rx) = async_channel::bounded(2); 195 set.spawn(get_cars(cars_folder, cars_tx)); 196 197 let n: Arc<AtomicUsize> = Arc::new(0.into()); 198 + let n_records: Arc<AtomicUsize> = Arc::new(0.into()); 199 let disk_workers_active: Arc<AtomicUsize> = Arc::new(0.into()); 200 201 set.spawn({ 202 let n = n.clone(); 203 + let n_records = n_records.clone(); 204 let mut interval = tokio::time::interval(std::time::Duration::from_secs(10)); 205 async move { 206 let mut last_n = n.load(Ordering::Relaxed); 207 + let mut last_n_records = n.load(Ordering::Relaxed); 208 loop { 209 interval.tick().await; 210 let n = n.load(Ordering::Relaxed); 211 + let n_records = n_records.load(Ordering::Relaxed); 212 + let diff_n = n - last_n; 213 + let diff_records = n_records - last_n_records; 214 + println!("rate: {} rec/sec; {} n/sec", diff_records / 10, diff_n / 10); 215 + if n_records > 0 && diff_records == 0 { 216 println!("zero encountered, stopping rate calculation polling."); 217 break Ok(()); 218 } 219 last_n = n; 220 + last_n_records = n_records; 221 } 222 } 223 }); 224 225 let (needs_disk_tx, needs_disk_rx) = async_channel::bounded(disk_workers); 226 227 for _ in 0..mem_workers { 228 + set.spawn(mem_worker( 229 + cars_rx.clone(), 230 + needs_disk_tx.clone(), 231 + n.clone(), 232 + n_records.clone(), 233 + )); 234 } 235 drop(cars_rx); 236 drop(needs_disk_tx); ··· 242 needs_disk_rx.clone(), 243 disk_folder.clone(), 244 n.clone(), 245 + n_records.clone(), 246 disk_workers_active.clone(), 247 )); 248 } ··· 252 println!("task from set joined: {res:?}"); 253 } 254 255 + eprintln!("total records processed: {n_records:?}; total n: {n:?}"); 256 257 Ok(()) 258 }
+12 -13
spacedust/src/bin/scrape_pds.rs
··· 1 - use tokio::io::AsyncWriteExt; 2 use clap::Parser; 3 - use std::path::PathBuf; 4 use reqwest::Url; 5 - use tokio::{sync::mpsc, time}; 6 use serde::Deserialize; 7 8 type Result<T> = std::result::Result<T, Box<dyn std::error::Error>>; 9 ··· 31 32 pds.set_path("/xrpc/com.atproto.sync.getRepo"); 33 pds.set_query(Some(&format!("did={did}"))); 34 - let mut byte_stream = client 35 - .get(pds) 36 - .send() 37 - .await? 38 - .bytes_stream(); 39 40 while let Some(stuff) = byte_stream.next().await { 41 tokio::io::copy(&mut stuff?.as_ref(), &mut w).await?; ··· 44 45 Ok(()) 46 } 47 - 48 49 #[derive(Debug, Deserialize)] 50 struct RepoInfo { ··· 80 .expect("json response"); 81 for repo in res.repos { 82 if repo.active { 83 - tx.send(repo.did).await.expect("to be able to send on the channel"); 84 } 85 } 86 cursor = res.cursor; ··· 88 break; 89 } 90 } 91 - 92 }); 93 rx 94 } 95 - 96 97 #[tokio::main] 98 async fn main() -> Result<()> { 99 env_logger::init(); 100 101 - let Args { pds, throttle_ms, folder } = Args::parse(); 102 103 tokio::fs::create_dir_all(folder.clone()).await?; 104
··· 1 use clap::Parser; 2 use reqwest::Url; 3 use serde::Deserialize; 4 + use std::path::PathBuf; 5 + use tokio::io::AsyncWriteExt; 6 + use tokio::{sync::mpsc, time}; 7 8 type Result<T> = std::result::Result<T, Box<dyn std::error::Error>>; 9 ··· 31 32 pds.set_path("/xrpc/com.atproto.sync.getRepo"); 33 pds.set_query(Some(&format!("did={did}"))); 34 + let mut byte_stream = client.get(pds).send().await?.bytes_stream(); 35 36 while let Some(stuff) = byte_stream.next().await { 37 tokio::io::copy(&mut stuff?.as_ref(), &mut w).await?; ··· 40 41 Ok(()) 42 } 43 44 #[derive(Debug, Deserialize)] 45 struct RepoInfo { ··· 75 .expect("json response"); 76 for repo in res.repos { 77 if repo.active { 78 + tx.send(repo.did) 79 + .await 80 + .expect("to be able to send on the channel"); 81 } 82 } 83 cursor = res.cursor; ··· 85 break; 86 } 87 } 88 }); 89 rx 90 } 91 92 #[tokio::main] 93 async fn main() -> Result<()> { 94 env_logger::init(); 95 96 + let Args { 97 + pds, 98 + throttle_ms, 99 + folder, 100 + } = Args::parse(); 101 102 tokio::fs::create_dir_all(folder.clone()).await?; 103
+1
spacedust/src/storage/car/mod.rs
···
··· 1 +
+4 -3
spacedust/src/storage/fjall/mod.rs
··· 1 use crate::storage::Storage; 2 3 - struct FjallStorage { 4 - } 5 6 impl Storage for FjallStorage { 7 - fn import_car() { todo!() } 8 }
··· 1 use crate::storage::Storage; 2 3 + pub struct FjallStorage {} 4 5 impl Storage for FjallStorage { 6 + fn import_car() { 7 + todo!() 8 + } 9 }
+1 -3
spacedust/src/storage/mod.rs
··· 2 pub mod fjall; 3 4 pub trait Storage { 5 - fn import_car() { 6 - 7 - } 8 }
··· 2 pub mod fjall; 3 4 pub trait Storage { 5 + fn import_car() {} 6 }