tracks lexicons and how many times they appeared on the jetstream
3
fork

Configure Feed

Select the types of activity you want to include in your feed.

feat(server): make db configurable, improve migration perf

ptr.pet 11924417 4c4a5aa3

verified
+70 -25
+42 -15
server/src/db/mod.rs
··· 3 3 fmt::Debug, 4 4 io::Cursor, 5 5 ops::{Bound, Deref, RangeBounds}, 6 - path::Path, 6 + path::{Path, PathBuf}, 7 7 time::Duration, 8 8 }; 9 9 ··· 75 75 pub disk_size: u64, 76 76 } 77 77 78 + pub struct DbConfig { 79 + pub ks_config: fjall::Config, 80 + pub min_block_size: usize, 81 + pub max_block_size: usize, 82 + pub max_last_activity: u64, 83 + } 84 + 85 + impl DbConfig { 86 + pub fn path(mut self, path: impl AsRef<Path>) -> Self { 87 + self.ks_config = fjall::Config::new(path); 88 + self 89 + } 90 + 91 + pub fn ks(mut self, f: impl FnOnce(fjall::Config) -> fjall::Config) -> Self { 92 + self.ks_config = f(self.ks_config); 93 + self 94 + } 95 + } 96 + 97 + impl Default for DbConfig { 98 + fn default() -> Self { 99 + Self { 100 + ks_config: fjall::Config::default(), 101 + min_block_size: 512, 102 + max_block_size: 500_000, 103 + max_last_activity: Duration::from_secs(10).as_nanos() as u64, 104 + } 105 + } 106 + } 107 + 78 108 // counts is nsid -> NsidCounts 79 109 // hits is tree per nsid: varint start time + varint end time -> block of hits 80 110 pub struct Db { 81 - ks: Keyspace, 111 + pub cfg: DbConfig, 112 + pub ks: Keyspace, 82 113 counts: Partition, 83 114 hits: scc::HashIndex<SmolStr, Arc<LexiconHandle>>, 84 115 sync_pool: threadpool::ThreadPool, 85 116 event_broadcaster: broadcast::Sender<(SmolStr, NsidCounts)>, 86 117 eps: RateTracker<100>, 87 118 cancel_token: CancellationToken, 88 - pub min_block_size: usize, 89 - pub max_block_size: usize, 90 - pub max_last_activity: u64, 91 119 } 92 120 93 121 impl Db { 94 - pub fn new(path: impl AsRef<Path>, cancel_token: CancellationToken) -> AppResult<Self> { 122 + pub fn new(cfg: DbConfig, cancel_token: CancellationToken) -> AppResult<Self> { 95 123 tracing::info!("opening db..."); 96 - let ks = Config::new(path).open()?; 124 + let ks = cfg.ks_config.clone().open()?; 97 125 Ok(Self { 126 + cfg, 98 127 hits: Default::default(), 99 128 sync_pool: threadpool::Builder::new() 100 129 .num_threads(rayon::current_num_threads() * 2) ··· 107 136 event_broadcaster: broadcast::channel(1000).0, 108 137 eps: RateTracker::new(Duration::from_secs(1)), 109 138 cancel_token, 110 - min_block_size: 512, 111 - max_block_size: 500_000, 112 - max_last_activity: Duration::from_secs(10).as_nanos() as u64, 113 139 }) 114 140 } 115 141 ··· 140 166 for (_, handle) in self.hits.iter(&_guard) { 141 167 let mut nsid_data = Vec::with_capacity(2); 142 168 let mut total_count = 0; 143 - let is_too_old = handle.since_last_activity() > self.max_last_activity; 169 + let is_too_old = handle.since_last_activity() > self.cfg.max_last_activity; 144 170 // if we disconnect for a long time, we want to sync all of what we 145 171 // have to avoid having many small blocks (even if we run compaction 146 172 // later, it reduces work until we run compaction) 147 173 let block_size = (is_too_old || all) 148 - .then_some(self.max_block_size) 174 + .then_some(self.cfg.max_block_size) 149 175 .unwrap_or_else(|| { 150 - self.max_block_size 151 - .min(self.min_block_size.max(handle.suggested_block_size())) 176 + self.cfg 177 + .max_block_size 178 + .min(self.cfg.min_block_size.max(handle.suggested_block_size())) 152 179 }); 153 180 let count = handle.item_count(); 154 181 let data_count = count / block_size; ··· 237 264 } 238 265 239 266 pub fn major_compact(&self) -> AppResult<()> { 240 - self.compact_all(self.max_block_size, .., true)?; 267 + self.compact_all(self.cfg.max_block_size, .., true)?; 241 268 let _guard = scc::ebr::Guard::new(); 242 269 for (_, handle) in self.hits.iter(&_guard) { 243 270 handle.deref().major_compact()?;
+28 -10
server/src/main.rs
··· 1 - use std::{ops::Deref, time::Duration}; 1 + use std::{ops::Deref, time::Duration, u64}; 2 2 3 3 use itertools::Itertools; 4 4 use rclite::Arc; ··· 9 9 10 10 use crate::{ 11 11 api::serve, 12 - db::{Db, EventRecord}, 12 + db::{Db, DbConfig, EventRecord}, 13 13 error::AppError, 14 14 jetstream::JetstreamClient, 15 15 utils::{CLOCK, RelativeDateTime, get_time}, ··· 57 57 58 58 let cancel_token = CancellationToken::new(); 59 59 60 - let db = 61 - Arc::new(Db::new(".fjall_data", cancel_token.child_token()).expect("couldnt create db")); 60 + let db = Arc::new( 61 + Db::new(DbConfig::default(), cancel_token.child_token()).expect("couldnt create db"), 62 + ); 62 63 63 64 rustls::crypto::ring::default_provider() 64 65 .install_default() ··· 156 157 }, 157 158 "running compaction...", 158 159 ); 159 - match db.compact_all(db.max_block_size, range, false) { 160 + match db.compact_all(db.cfg.max_block_size, range, false) { 160 161 Ok(_) => (), 161 162 Err(e) => tracing::error!("failed to compact db: {}", e), 162 163 } ··· 202 203 } 203 204 204 205 fn debug() { 205 - let db = Db::new(".fjall_data", CancellationToken::new()).expect("couldnt create db"); 206 + let db = Db::new(DbConfig::default(), CancellationToken::new()).expect("couldnt create db"); 206 207 let info = db.info().expect("cant get db info"); 207 208 println!("disk size: {}", info.disk_size); 208 209 for (nsid, blocks) in info.nsids { ··· 226 227 } 227 228 228 229 fn compact() { 229 - let db = Db::new(".fjall_data", CancellationToken::new()).expect("couldnt create db"); 230 + let db = Db::new(DbConfig::default(), CancellationToken::new()).expect("couldnt create db"); 230 231 let info = db.info().expect("cant get db info"); 231 232 db.major_compact().expect("cant compact"); 232 233 std::thread::sleep(Duration::from_secs(5)); ··· 247 248 fn migrate() { 248 249 let cancel_token = CancellationToken::new(); 249 250 let from = Arc::new( 250 - Db::new(".fjall_data_from", cancel_token.child_token()).expect("couldnt create db"), 251 + Db::new( 252 + DbConfig::default().path(".fjall_data_from"), 253 + cancel_token.child_token(), 254 + ) 255 + .expect("couldnt create db"), 251 256 ); 252 - let to = 253 - Arc::new(Db::new(".fjall_data_to", cancel_token.child_token()).expect("couldnt create db")); 257 + let to = Arc::new( 258 + Db::new( 259 + DbConfig::default().path(".fjall_data_to").ks(|c| { 260 + c.max_journaling_size(1024 * 1024 * 1024 * 8) 261 + .max_write_buffer_size(u64::MAX) 262 + .manual_journal_persist(true) 263 + }), 264 + cancel_token.child_token(), 265 + ) 266 + .expect("couldnt create db"), 267 + ); 254 268 255 269 let nsids = from.get_nsids().collect::<Vec<_>>(); 256 270 let eps_thread = std::thread::spawn({ ··· 296 310 drop(from); 297 311 tracing::info!("starting sync!!!"); 298 312 to.sync(true).expect("cant sync"); 313 + tracing::info!("persisting..."); 314 + to.ks 315 + .persist(fjall::PersistMode::SyncAll) 316 + .expect("cant persist"); 299 317 let total_time = start.elapsed(); 300 318 let write_per_second = total_count as f64 / (total_time - read_time).as_secs_f64(); 301 319 tracing::info!(