Constellation, Spacedust, Slingshot, UFOs: atproto crates and services for microcosm

Merge pull request #20 from at-microcosm/ufos-fix-data

UFOs: change counting and fix NSID encoding

authored by bad-example.com and committed by GitHub e798135f e798b332

+1 -1
ufos/fuzz/fuzz_targets/counts_value.rs
··· 19 19 assert_eq!(serialized.len(), n); 20 20 let (and_back, n_again) = CountsValue::from_db_bytes(&serialized).unwrap(); 21 21 assert_eq!(n_again, n); 22 - assert_eq!(and_back.records(), counts_value.records()); 22 + assert_eq!(and_back.counts(), counts_value.counts()); 23 23 assert_eq!(and_back.dids().estimate(), counts_value.dids().estimate()); 24 24 } 25 25 });
+96 -29
ufos/src/db_types.rs
··· 58 58 } 59 59 60 60 pub trait DbBytes { 61 - fn to_db_bytes(&self) -> Result<Vec<u8>, EncodingError>; 61 + fn to_db_bytes(&self) -> EncodingResult<Vec<u8>>; 62 62 fn from_db_bytes(bytes: &[u8]) -> Result<(Self, usize), EncodingError> 63 63 where 64 64 Self: Sized; 65 + } 66 + 67 + pub trait SubPrefixBytes<T> { 68 + fn sub_prefix(input: T) -> EncodingResult<Vec<u8>>; 65 69 } 66 70 67 71 #[derive(PartialEq)] ··· 76 80 pub fn from_pair(prefix: P, suffix: S) -> Self { 77 81 Self { prefix, suffix } 78 82 } 79 - pub fn from_prefix_to_db_bytes(prefix: &P) -> Result<Vec<u8>, EncodingError> { 83 + pub fn from_prefix_to_db_bytes(prefix: &P) -> EncodingResult<Vec<u8>> { 80 84 prefix.to_db_bytes() 81 85 } 82 - pub fn to_prefix_db_bytes(&self) -> Result<Vec<u8>, EncodingError> { 86 + pub fn to_prefix_db_bytes(&self) -> EncodingResult<Vec<u8>> { 83 87 self.prefix.to_db_bytes() 84 88 } 85 - pub fn prefix_range_end(prefix: &P) -> Result<Vec<u8>, EncodingError> { 89 + pub fn prefix_range_end(prefix: &P) -> EncodingResult<Vec<u8>> { 86 90 let prefix_bytes = prefix.to_db_bytes()?; 87 91 let (_, Bound::Excluded(range_end)) = prefix_to_range(&prefix_bytes) else { 88 92 return Err(EncodingError::BadRangeBound); 89 93 }; 90 94 Ok(range_end.to_vec()) 91 95 } 92 - pub fn range_end(&self) -> Result<Vec<u8>, EncodingError> { 96 + pub fn range_end(&self) -> EncodingResult<Vec<u8>> { 93 97 Self::prefix_range_end(&self.prefix) 94 98 } 95 99 pub fn range(&self) -> Result<Range<Vec<u8>>, EncodingError> { ··· 104 108 } 105 109 } 106 110 111 + impl<P: DbBytes + Default, S: DbBytes + Default> Default for DbConcat<P, S> { 112 + fn default() -> Self { 113 + Self { 114 + prefix: Default::default(), 115 + suffix: Default::default(), 116 + } 117 + } 118 + } 119 + 107 120 impl<P: DbBytes + std::fmt::Debug, S: DbBytes + std::fmt::Debug> fmt::Debug for DbConcat<P, S> { 108 121 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { 109 122 write!(f, "DbConcat<{:?} || {:?}>", self.prefix, self.suffix) ··· 111 124 } 112 125 113 126 impl<P: DbBytes, S: DbBytes> DbBytes for DbConcat<P, S> { 114 - fn to_db_bytes(&self) -> Result<Vec<u8>, EncodingError> { 127 + fn to_db_bytes(&self) -> EncodingResult<Vec<u8>> { 115 128 let mut combined = self.prefix.to_db_bytes()?; 116 129 combined.append(&mut self.suffix.to_db_bytes()?); 117 130 Ok(combined) ··· 147 160 #[derive(Debug, Default, PartialEq)] 148 161 pub struct DbEmpty(()); 149 162 impl DbBytes for DbEmpty { 150 - fn to_db_bytes(&self) -> Result<Vec<u8>, EncodingError> { 163 + fn to_db_bytes(&self) -> EncodingResult<Vec<u8>> { 151 164 Ok(vec![]) 152 165 } 153 166 fn from_db_bytes(_: &[u8]) -> Result<(Self, usize), EncodingError> { ··· 176 189 } 177 190 } 178 191 impl<S: StaticStr> DbBytes for DbStaticStr<S> { 179 - fn to_db_bytes(&self) -> Result<Vec<u8>, EncodingError> { 192 + fn to_db_bytes(&self) -> EncodingResult<Vec<u8>> { 180 193 S::static_str().to_string().to_db_bytes() 181 194 } 182 195 fn from_db_bytes(bytes: &[u8]) -> Result<(Self, usize), EncodingError> { ··· 203 216 where 204 217 T: BincodeEncode + BincodeDecode<()> + UseBincodePlz + Sized + std::fmt::Debug, 205 218 { 206 - fn to_db_bytes(&self) -> Result<Vec<u8>, EncodingError> { 219 + fn to_db_bytes(&self) -> EncodingResult<Vec<u8>> { 207 220 Ok(encode_to_vec(self, bincode_conf())?) 208 221 } 209 222 fn from_db_bytes(bytes: &[u8]) -> Result<(Self, usize), EncodingError> { ··· 213 226 214 227 /// helper trait: impl on a type to get helpers to implement DbBytes 215 228 pub trait SerdeBytes: serde::Serialize + for<'a> serde::Deserialize<'a> { 216 - fn to_bytes(&self) -> Result<Vec<u8>, EncodingError> 229 + fn to_bytes(&self) -> EncodingResult<Vec<u8>> 217 230 where 218 231 Self: std::fmt::Debug, 219 232 { ··· 229 242 impl<const N: usize> UseBincodePlz for [u8; N] {} 230 243 231 244 impl DbBytes for Vec<u8> { 232 - fn to_db_bytes(&self) -> Result<Vec<u8>, EncodingError> { 245 + fn to_db_bytes(&self) -> EncodingResult<Vec<u8>> { 233 246 Ok(self.to_vec()) 234 247 } 235 248 fn from_db_bytes(bytes: &[u8]) -> Result<(Self, usize), EncodingError> { ··· 247 260 /// TODO: wrap in another type. it's actually probably not desirable to serialize strings this way 248 261 /// *except* where needed as a prefix. 249 262 impl DbBytes for String { 250 - fn to_db_bytes(&self) -> Result<Vec<u8>, EncodingError> { 263 + fn to_db_bytes(&self) -> EncodingResult<Vec<u8>> { 251 264 let mut v = self.as_bytes().to_vec(); 252 265 if v.contains(&0x00) { 253 266 return Err(EncodingError::StringContainedNull); ··· 264 277 } 265 278 } 266 279 Err(EncodingError::UnterminatedString) 280 + } 281 + } 282 + 283 + impl SubPrefixBytes<&str> for String { 284 + fn sub_prefix(input: &str) -> EncodingResult<Vec<u8>> { 285 + let v = input.as_bytes(); 286 + if v.contains(&0x00) { 287 + return Err(EncodingError::StringContainedNull); 288 + } 289 + // NO null terminator!! 290 + Ok(v.to_vec()) 267 291 } 268 292 } 269 293 ··· 273 297 let me = Self::new(s).map_err(EncodingError::BadAtriumStringType)?; 274 298 Ok((me, n)) 275 299 } 276 - fn to_db_bytes(&self) -> Result<Vec<u8>, EncodingError> { 300 + fn to_db_bytes(&self) -> EncodingResult<Vec<u8>> { 277 301 Ok(encode_to_vec(self.as_ref(), bincode_conf())?) 278 302 } 279 303 } 280 304 281 - // BUG: this needs to use the null-terminating string thing!!!!!!!!!!!!!! the whole point of all of this!!!! 282 305 impl DbBytes for Nsid { 283 306 fn from_db_bytes(bytes: &[u8]) -> Result<(Self, usize), EncodingError> { 284 - let (s, n) = decode_from_slice(bytes, bincode_conf())?; 307 + let (s, n) = String::from_db_bytes(bytes)?; // null-terminated DbBytes impl!! 285 308 let me = Self::new(s).map_err(EncodingError::BadAtriumStringType)?; 286 309 Ok((me, n)) 287 310 } 288 - fn to_db_bytes(&self) -> Result<Vec<u8>, EncodingError> { 289 - Ok(encode_to_vec(self.as_ref(), bincode_conf())?) 311 + fn to_db_bytes(&self) -> EncodingResult<Vec<u8>> { 312 + String::to_db_bytes(&self.to_string()) // null-terminated DbBytes impl!!!! 313 + } 314 + } 315 + impl SubPrefixBytes<&str> for Nsid { 316 + fn sub_prefix(input: &str) -> EncodingResult<Vec<u8>> { 317 + String::sub_prefix(input) 290 318 } 291 319 } 292 320 ··· 296 324 let me = Self::new(s).map_err(EncodingError::BadAtriumStringType)?; 297 325 Ok((me, n)) 298 326 } 299 - fn to_db_bytes(&self) -> Result<Vec<u8>, EncodingError> { 327 + fn to_db_bytes(&self) -> EncodingResult<Vec<u8>> { 300 328 Ok(encode_to_vec(self.as_ref(), bincode_conf())?) 301 329 } 302 330 } 303 331 304 332 impl DbBytes for Cursor { 305 - fn to_db_bytes(&self) -> Result<Vec<u8>, EncodingError> { 333 + fn to_db_bytes(&self) -> EncodingResult<Vec<u8>> { 306 334 Ok(self.to_raw_u64().to_be_bytes().to_vec()) 307 335 } 308 336 fn from_db_bytes(bytes: &[u8]) -> Result<(Self, usize), EncodingError> { ··· 316 344 } 317 345 318 346 impl DbBytes for serde_json::Value { 319 - fn to_db_bytes(&self) -> Result<Vec<u8>, EncodingError> { 347 + fn to_db_bytes(&self) -> EncodingResult<Vec<u8>> { 320 348 self.to_string().to_db_bytes() 321 349 } 322 350 fn from_db_bytes(bytes: &[u8]) -> Result<(Self, usize), EncodingError> { ··· 336 364 337 365 #[cfg(test)] 338 366 mod test { 339 - use super::{Cursor, DbBytes, DbConcat, DbEmpty, DbStaticStr, EncodingError, StaticStr}; 367 + use super::{ 368 + Cursor, DbBytes, DbConcat, DbEmpty, DbStaticStr, EncodingResult, Nsid, StaticStr, 369 + SubPrefixBytes, 370 + }; 340 371 341 372 #[test] 342 - fn test_db_empty() -> Result<(), EncodingError> { 373 + fn test_db_empty() -> EncodingResult<()> { 343 374 let original = DbEmpty::default(); 344 375 let serialized = original.to_db_bytes()?; 345 376 assert_eq!(serialized.len(), 0); ··· 350 381 } 351 382 352 383 #[test] 353 - fn test_string_roundtrip() -> Result<(), EncodingError> { 384 + fn test_string_roundtrip() -> EncodingResult<()> { 354 385 for (case, desc) in [ 355 386 ("", "empty string"), 356 387 ("a", "basic string"), ··· 369 400 } 370 401 371 402 #[test] 372 - fn test_string_serialized_lexicographic_sort() -> Result<(), EncodingError> { 403 + fn test_string_serialized_lexicographic_sort() -> EncodingResult<()> { 373 404 let aa = "aa".to_string().to_db_bytes()?; 374 405 let b = "b".to_string().to_db_bytes()?; 375 406 assert!(b > aa); ··· 377 408 } 378 409 379 410 #[test] 380 - fn test_string_cursor_prefix_roundtrip() -> Result<(), EncodingError> { 411 + fn test_nullstring_can_prefix() -> EncodingResult<()> { 412 + for (s, pre, is_pre, desc) in [ 413 + ("", "", true, "empty strings"), 414 + ("", "a", false, "longer prefix"), 415 + ("a", "", true, "empty prefix matches"), 416 + ("a", "a", true, "whole string matches"), 417 + ("a", "b", false, "entirely different"), 418 + ("ab", "a", true, "prefix matches"), 419 + ("ab", "b", false, "shorter and entirely different"), 420 + ] { 421 + let serialized = s.to_string().to_db_bytes()?; 422 + let prefixed = String::sub_prefix(pre)?; 423 + assert_eq!(serialized.starts_with(&prefixed), is_pre, "{}", desc); 424 + } 425 + Ok(()) 426 + } 427 + 428 + #[test] 429 + fn test_nsid_can_prefix() -> EncodingResult<()> { 430 + for (s, pre, is_pre, desc) in [ 431 + ("ab.cd.ef", "", true, "empty prefix"), 432 + ("ab.cd.ef", "a", true, "tiny prefix"), 433 + ("ab.cd.ef", "abc", false, "bad prefix"), 434 + ("ab.cd.ef", "ab", true, "segment prefix"), 435 + ("ab.cd.ef", "ab.cd", true, "multi-segment prefix"), 436 + ("ab.cd.ef", "ab.cd.ef", true, "full match"), 437 + ("ab.cd.ef", "ab.cd.ef.g", false, "prefix longer"), 438 + ] { 439 + let serialized = Nsid::new(s.to_string()).unwrap().to_db_bytes()?; 440 + let prefixed = Nsid::sub_prefix(pre)?; 441 + assert_eq!(serialized.starts_with(&prefixed), is_pre, "{}", desc); 442 + } 443 + Ok(()) 444 + } 445 + 446 + #[test] 447 + fn test_string_cursor_prefix_roundtrip() -> EncodingResult<()> { 381 448 type TwoThings = DbConcat<String, Cursor>; 382 449 for (lazy_prefix, tired_suffix, desc) in [ 383 450 ("", 0, "empty string and cursor"), ··· 402 469 } 403 470 404 471 #[test] 405 - fn test_cursor_string_prefix_roundtrip() -> Result<(), EncodingError> { 472 + fn test_cursor_string_prefix_roundtrip() -> EncodingResult<()> { 406 473 type TwoThings = DbConcat<Cursor, String>; 407 474 for (tired_prefix, sad_suffix, desc) in [ 408 475 (0, "", "empty string and cursor"), ··· 427 494 } 428 495 429 496 #[test] 430 - fn test_static_str() -> Result<(), EncodingError> { 497 + fn test_static_str() -> EncodingResult<()> { 431 498 #[derive(Debug, PartialEq)] 432 499 struct AStaticStr {} 433 500 impl StaticStr for AStaticStr { ··· 448 515 } 449 516 450 517 #[test] 451 - fn test_static_str_empty() -> Result<(), EncodingError> { 518 + fn test_static_str_empty() -> EncodingResult<()> { 452 519 #[derive(Debug, PartialEq)] 453 520 struct AnEmptyStr {} 454 521 impl StaticStr for AnEmptyStr { ··· 468 535 } 469 536 470 537 #[test] 471 - fn test_static_prefix() -> Result<(), EncodingError> { 538 + fn test_static_prefix() -> EncodingResult<()> { 472 539 #[derive(Debug, PartialEq)] 473 540 struct AStaticPrefix {} 474 541 impl StaticStr for AStaticPrefix {
+71 -25
ufos/src/lib.rs
··· 27 27 28 28 #[derive(Debug, Default, Clone)] 29 29 pub struct CollectionCommits<const LIMIT: usize> { 30 - pub total_seen: usize, 30 + pub creates: usize, 31 + pub updates: usize, 32 + pub deletes: usize, 31 33 pub dids_estimate: Sketch<14>, 32 34 pub commits: Vec<UFOsCommit>, 33 35 head: usize, 34 - non_creates: usize, 35 36 } 36 37 37 38 impl<const LIMIT: usize> CollectionCommits<LIMIT> { ··· 41 42 self.head = 0; 42 43 } 43 44 } 45 + /// lossy-ish commit insertion 46 + /// 47 + /// - new commits are *always* added to the batch or else rejected as full. 48 + /// - when LIMIT is reached, new commits can displace existing `creates`. 49 + /// `update`s and `delete`s are *never* displaced. 50 + /// - if all batched `creates` have been displaced, the batch is full. 51 + /// 52 + /// in general it's rare for commits to be displaced except for very high- 53 + /// volume collections such as `app.bsky.feed.like`. 54 + /// 55 + /// it could be nice in the future to retain all batched commits and just 56 + /// drop new `creates` after a limit instead. 44 57 pub fn truncating_insert( 45 58 &mut self, 46 59 commit: UFOsCommit, 47 60 sketch_secret: &SketchSecretPrefix, 48 61 ) -> Result<(), BatchInsertError> { 49 - if self.non_creates == LIMIT { 62 + if (self.updates + self.deletes) == LIMIT { 63 + // nothing can be displaced (only `create`s may be displaced) 50 64 return Err(BatchInsertError::BatchFull(commit)); 51 65 } 52 - let did = commit.did.clone(); 53 - let is_create = commit.action.is_create(); 66 + 67 + // every kind of commit counts as "user activity" 68 + self.dids_estimate 69 + .insert(did_element(sketch_secret, &commit.did)); 70 + 71 + match commit.action { 72 + CommitAction::Put(PutAction { 73 + is_update: false, .. 74 + }) => { 75 + self.creates += 1; 76 + } 77 + CommitAction::Put(PutAction { 78 + is_update: true, .. 79 + }) => { 80 + self.updates += 1; 81 + } 82 + CommitAction::Cut => { 83 + self.deletes += 1; 84 + } 85 + } 86 + 54 87 if self.commits.len() < LIMIT { 88 + // normal insert: there's space left to put a new commit at the end 55 89 self.commits.push(commit); 56 - if self.commits.capacity() > LIMIT { 57 - self.commits.shrink_to(LIMIT); // save mem?????? maybe?? 58 - } 59 90 } else { 91 + // displacement insert: find an old `create` we can displace 60 92 let head_started_at = self.head; 61 93 loop { 62 94 let candidate = self ··· 74 106 } 75 107 } 76 108 77 - if is_create { 78 - self.total_seen += 1; 79 - self.dids_estimate.insert(did_element(sketch_secret, &did)); 80 - } else { 81 - self.non_creates += 1; 82 - } 83 - 84 109 Ok(()) 85 110 } 86 111 } ··· 179 204 .truncating_insert(commit, sketch_secret)?; 180 205 Ok(()) 181 206 } 182 - pub fn total_records(&self) -> usize { 183 - self.commits_by_nsid.values().map(|v| v.commits.len()).sum() 184 - } 185 - pub fn total_seen(&self) -> usize { 186 - self.commits_by_nsid.values().map(|v| v.total_seen).sum() 187 - } 188 207 pub fn total_collections(&self) -> usize { 189 208 self.commits_by_nsid.len() 190 209 } ··· 237 256 #[derive(Debug, Serialize, JsonSchema)] 238 257 pub struct NsidCount { 239 258 nsid: String, 240 - records: u64, 259 + creates: u64, 241 260 dids_estimate: u64, 242 261 } 243 262 244 263 #[derive(Debug, Serialize, JsonSchema)] 245 264 pub struct JustCount { 246 - records: u64, 265 + creates: u64, 247 266 dids_estimate: u64, 248 267 } 249 268 ··· 309 328 &[0u8; 16], 310 329 )?; 311 330 312 - assert_eq!(commits.total_seen, 3); 331 + assert_eq!(commits.creates, 3); 313 332 assert_eq!(commits.dids_estimate.estimate(), 1); 314 333 assert_eq!(commits.commits.len(), 2); 315 334 ··· 333 352 } 334 353 335 354 #[test] 355 + fn test_truncating_insert_counts_updates() -> anyhow::Result<()> { 356 + let mut commits: CollectionCommits<2> = Default::default(); 357 + 358 + commits.truncating_insert( 359 + UFOsCommit { 360 + cursor: Cursor::from_raw_u64(100), 361 + did: Did::new("did:plc:whatever".to_string()).unwrap(), 362 + rkey: RecordKey::new("rkey-asdf-a".to_string()).unwrap(), 363 + rev: "rev-asdf".to_string(), 364 + action: CommitAction::Put(PutAction { 365 + record: RawValue::from_string("{}".to_string())?, 366 + is_update: true, 367 + }), 368 + }, 369 + &[0u8; 16], 370 + )?; 371 + 372 + assert_eq!(commits.creates, 0); 373 + assert_eq!(commits.updates, 1); 374 + assert_eq!(commits.deletes, 0); 375 + assert_eq!(commits.dids_estimate.estimate(), 1); 376 + assert_eq!(commits.commits.len(), 1); 377 + Ok(()) 378 + } 379 + 380 + #[test] 336 381 fn test_truncating_insert_does_not_truncate_deletes() -> anyhow::Result<()> { 337 382 let mut commits: CollectionCommits<2> = Default::default(); 338 383 ··· 375 420 &[0u8; 16], 376 421 )?; 377 422 378 - assert_eq!(commits.total_seen, 2); 423 + assert_eq!(commits.creates, 2); 424 + assert_eq!(commits.deletes, 1); 379 425 assert_eq!(commits.dids_estimate.estimate(), 1); 380 426 assert_eq!(commits.commits.len(), 2); 381 427
+5 -3
ufos/src/server.rs
··· 109 109 consumer, 110 110 }) 111 111 } 112 + 113 + // TODO: replace with normal (🙃) multi-qs value somehow 112 114 fn to_multiple_nsids(s: &str) -> Result<HashSet<Nsid>, String> { 113 115 let mut out = HashSet::new(); 114 116 for collection in s.split(',') { ··· 197 199 } 198 200 #[derive(Debug, Serialize, JsonSchema)] 199 201 struct TotalCounts { 200 - total_records: u64, 202 + total_creates: u64, 201 203 dids_estimate: u64, 202 204 } 203 205 /// Get total records seen by collection ··· 218 220 let mut seen_by_collection = HashMap::with_capacity(collections.len()); 219 221 220 222 for collection in &collections { 221 - let (total_records, dids_estimate) = storage 223 + let (total_creates, dids_estimate) = storage 222 224 .get_counts_by_collection(collection) 223 225 .await 224 226 .map_err(|e| HttpError::for_internal_error(format!("boooo: {e:?}")))?; ··· 226 228 seen_by_collection.insert( 227 229 collection.to_string(), 228 230 TotalCounts { 229 - total_records, 231 + total_creates, 230 232 dids_estimate, 231 233 }, 232 234 );
+66 -62
ufos/src/storage_fjall.rs
··· 2 2 use crate::error::StorageError; 3 3 use crate::storage::{StorageResult, StorageWhatever, StoreBackground, StoreReader, StoreWriter}; 4 4 use crate::store_types::{ 5 - AllTimeDidsKey, AllTimeRecordsKey, AllTimeRollupKey, CountsValue, CursorBucket, 5 + AllTimeDidsKey, AllTimeRecordsKey, AllTimeRollupKey, CommitCounts, CountsValue, CursorBucket, 6 6 DeleteAccountQueueKey, DeleteAccountQueueVal, HourTruncatedCursor, HourlyDidsKey, 7 7 HourlyRecordsKey, HourlyRollupKey, HourlyRollupStaticPrefix, JetstreamCursorKey, 8 8 JetstreamCursorValue, JetstreamEndpointKey, JetstreamEndpointValue, LiveCountsKey, ··· 507 507 } 508 508 out.push(NsidCount { 509 509 nsid: nsid.to_string(), 510 - records: merged.records(), 510 + creates: merged.counts().creates, 511 511 dids_estimate: merged.dids().estimate() as u64, 512 512 }); 513 513 } ··· 607 607 } 608 608 let mut ranked: Vec<(Nsid, CountsValue)> = ranked.into_iter().collect(); 609 609 match order { 610 - OrderCollectionsBy::RecordsCreated => ranked.sort_by_key(|(_, c)| c.records()), 610 + OrderCollectionsBy::RecordsCreated => ranked.sort_by_key(|(_, c)| c.counts().creates), 611 611 OrderCollectionsBy::DidsEstimate => ranked.sort_by_key(|(_, c)| c.dids().estimate()), 612 612 OrderCollectionsBy::Lexi { .. } => unreachable!(), 613 613 } ··· 617 617 .take(limit) 618 618 .map(|(nsid, cv)| NsidCount { 619 619 nsid: nsid.to_string(), 620 - records: cv.records(), 620 + creates: cv.counts().creates, 621 621 dids_estimate: cv.dids().estimate() as u64, 622 622 }) 623 623 .collect(); ··· 746 746 } 747 747 } 748 748 Ok(( 749 - total_counts.records(), 749 + total_counts.counts().creates, 750 750 total_counts.dids().estimate() as u64, 751 751 )) 752 752 } ··· 973 973 .unwrap_or_default(); 974 974 975 975 // now that we have values, we can know the exising ranks 976 - let before_records_count = rolled.records(); 976 + let before_creates_count = rolled.counts().creates; 977 977 let before_dids_estimate = rolled.dids().estimate() as u64; 978 978 979 979 // update the rollup 980 980 rolled.merge(&counts); 981 981 982 - // replace rank entries 983 - let (old_records, new_records, dids) = match rollup { 984 - Rollup::Hourly(hourly_cursor) => { 985 - let old_records = 986 - HourlyRecordsKey::new(hourly_cursor, before_records_count.into(), &nsid); 987 - let new_records = old_records.with_rank(rolled.records().into()); 988 - let new_estimate = rolled.dids().estimate() as u64; 989 - let dids = if new_estimate == before_dids_estimate { 990 - None 991 - } else { 992 - let old_dids = 993 - HourlyDidsKey::new(hourly_cursor, before_dids_estimate.into(), &nsid); 994 - let new_dids = old_dids.with_rank(new_estimate.into()); 995 - Some((old_dids.to_db_bytes()?, new_dids.to_db_bytes()?)) 996 - }; 997 - (old_records.to_db_bytes()?, new_records.to_db_bytes()?, dids) 998 - } 999 - Rollup::Weekly(weekly_cursor) => { 1000 - let old_records = 1001 - WeeklyRecordsKey::new(weekly_cursor, before_records_count.into(), &nsid); 1002 - let new_records = old_records.with_rank(rolled.records().into()); 1003 - let new_estimate = rolled.dids().estimate() as u64; 1004 - let dids = if new_estimate == before_dids_estimate { 1005 - None 1006 - } else { 1007 - let old_dids = 1008 - WeeklyDidsKey::new(weekly_cursor, before_dids_estimate.into(), &nsid); 1009 - let new_dids = old_dids.with_rank(new_estimate.into()); 1010 - Some((old_dids.to_db_bytes()?, new_dids.to_db_bytes()?)) 1011 - }; 1012 - (old_records.to_db_bytes()?, new_records.to_db_bytes()?, dids) 1013 - } 1014 - Rollup::AllTime => { 1015 - let old_records = AllTimeRecordsKey::new(before_records_count.into(), &nsid); 1016 - let new_records = old_records.with_rank(rolled.records().into()); 1017 - let new_estimate = rolled.dids().estimate() as u64; 1018 - let dids = if new_estimate == before_dids_estimate { 1019 - None 1020 - } else { 1021 - let old_dids = AllTimeDidsKey::new(before_dids_estimate.into(), &nsid); 1022 - let new_dids = old_dids.with_rank(new_estimate.into()); 1023 - Some((old_dids.to_db_bytes()?, new_dids.to_db_bytes()?)) 1024 - }; 1025 - (old_records.to_db_bytes()?, new_records.to_db_bytes()?, dids) 1026 - } 1027 - }; 982 + // new ranks 983 + let new_creates_count = rolled.counts().creates; 984 + let new_dids_estimate = rolled.dids().estimate() as u64; 1028 985 1029 - // replace the ranks 1030 - batch.remove(&self.rollups, &old_records); 1031 - batch.insert(&self.rollups, &new_records, ""); 1032 - if let Some((old_dids, new_dids)) = dids { 1033 - batch.remove(&self.rollups, &old_dids); 1034 - batch.insert(&self.rollups, &new_dids, ""); 986 + // update create-ranked secondary index if rank changed 987 + if new_creates_count != before_creates_count { 988 + let (old_k, new_k) = match rollup { 989 + Rollup::Hourly(cursor) => ( 990 + HourlyRecordsKey::new(cursor, before_creates_count.into(), &nsid) 991 + .to_db_bytes()?, 992 + HourlyRecordsKey::new(cursor, new_creates_count.into(), &nsid) 993 + .to_db_bytes()?, 994 + ), 995 + Rollup::Weekly(cursor) => ( 996 + WeeklyRecordsKey::new(cursor, before_creates_count.into(), &nsid) 997 + .to_db_bytes()?, 998 + WeeklyRecordsKey::new(cursor, new_creates_count.into(), &nsid) 999 + .to_db_bytes()?, 1000 + ), 1001 + Rollup::AllTime => ( 1002 + AllTimeRecordsKey::new(before_creates_count.into(), &nsid).to_db_bytes()?, 1003 + AllTimeRecordsKey::new(new_creates_count.into(), &nsid).to_db_bytes()?, 1004 + ), 1005 + }; 1006 + batch.remove(&self.rollups, &old_k); // TODO: when fjall gets weak delete, this will hopefully work way better 1007 + batch.insert(&self.rollups, &new_k, ""); 1008 + } 1009 + 1010 + // update dids-ranked secondary index if rank changed 1011 + if new_dids_estimate != before_dids_estimate { 1012 + let (old_k, new_k) = match rollup { 1013 + Rollup::Hourly(cursor) => ( 1014 + HourlyDidsKey::new(cursor, before_dids_estimate.into(), &nsid) 1015 + .to_db_bytes()?, 1016 + HourlyDidsKey::new(cursor, new_dids_estimate.into(), &nsid) 1017 + .to_db_bytes()?, 1018 + ), 1019 + Rollup::Weekly(cursor) => ( 1020 + WeeklyDidsKey::new(cursor, before_dids_estimate.into(), &nsid) 1021 + .to_db_bytes()?, 1022 + WeeklyDidsKey::new(cursor, new_dids_estimate.into(), &nsid) 1023 + .to_db_bytes()?, 1024 + ), 1025 + Rollup::AllTime => ( 1026 + AllTimeDidsKey::new(before_dids_estimate.into(), &nsid).to_db_bytes()?, 1027 + AllTimeDidsKey::new(new_dids_estimate.into(), &nsid).to_db_bytes()?, 1028 + ), 1029 + }; 1030 + batch.remove(&self.rollups, &old_k); // TODO: when fjall gets weak delete, this will hopefully work way better 1031 + batch.insert(&self.rollups, &new_k, ""); 1035 1032 } 1036 1033 1037 - // replace the rollup 1034 + // replace the main counts rollup 1038 1035 batch.insert(&self.rollups, &rollup_key_bytes, &rolled.to_db_bytes()?); 1039 1036 } 1040 1037 ··· 1114 1111 } 1115 1112 } 1116 1113 let live_counts_key: LiveCountsKey = (latest, &nsid).into(); 1117 - let counts_value = CountsValue::new(commits.total_seen as u64, commits.dids_estimate); 1114 + let counts_value = CountsValue::new( 1115 + CommitCounts { 1116 + creates: commits.creates as u64, 1117 + updates: commits.updates as u64, 1118 + deletes: commits.deletes as u64, 1119 + }, 1120 + commits.dids_estimate, 1121 + ); 1118 1122 batch.insert( 1119 1123 &self.rollups, 1120 1124 &live_counts_key.to_db_bytes()?, ··· 1838 1842 ); 1839 1843 write.insert_batch(batch.batch)?; 1840 1844 1841 - let (records, dids) = read.get_counts_by_collection(&collection)?; 1842 - assert_eq!(records, 1); 1845 + let (creates, dids) = read.get_counts_by_collection(&collection)?; 1846 + assert_eq!(creates, 1); 1843 1847 assert_eq!(dids, 1); 1844 1848 1845 1849 let records = read.get_records_by_collections([collection].into(), 2, false)?;
+12 -5
ufos/src/storage_mem.rs
··· 5 5 use crate::error::StorageError; 6 6 use crate::storage::{StorageResult, StorageWhatever, StoreBackground, StoreReader, StoreWriter}; 7 7 use crate::store_types::{ 8 - AllTimeRollupKey, CountsValue, DeleteAccountQueueKey, DeleteAccountQueueVal, 8 + AllTimeRollupKey, CommitCounts, CountsValue, DeleteAccountQueueKey, DeleteAccountQueueVal, 9 9 HourTruncatedCursor, HourlyRollupKey, JetstreamCursorKey, JetstreamCursorValue, 10 10 JetstreamEndpointKey, JetstreamEndpointValue, LiveCountsKey, NewRollupCursorKey, 11 11 NewRollupCursorValue, NsidRecordFeedKey, NsidRecordFeedVal, RecordLocationKey, ··· 483 483 } 484 484 } 485 485 Ok(( 486 - total_counts.records(), 486 + total_counts.counts().creates, 487 487 total_counts.dids().estimate() as u64, 488 488 )) 489 489 } ··· 724 724 assert_eq!(n, tripppin.len()); 725 725 assert_eq!(counts.prefix, and_back.prefix); 726 726 assert_eq!(counts.dids().estimate(), and_back.dids().estimate()); 727 - if counts.records() > 20000000 { 727 + if counts.counts().creates > 20000000 { 728 728 panic!("COUNTS maybe wtf? {counts:?}") 729 729 } 730 730 // assert_eq!(rolled, and_back); ··· 737 737 assert_eq!(n, tripppin.len()); 738 738 assert_eq!(rolled.prefix, and_back.prefix); 739 739 assert_eq!(rolled.dids().estimate(), and_back.dids().estimate()); 740 - if rolled.records() > 20000000 { 740 + if rolled.counts().creates > 20000000 { 741 741 panic!("maybe wtf? {rolled:?}") 742 742 } 743 743 // assert_eq!(rolled, and_back); ··· 804 804 } 805 805 } 806 806 let live_counts_key: LiveCountsKey = (latest, &nsid).into(); 807 - let counts_value = CountsValue::new(commits.total_seen as u64, commits.dids_estimate); 807 + let counts_value = CountsValue::new( 808 + CommitCounts { 809 + creates: commits.creates as u64, 810 + updates: commits.updates as u64, 811 + deletes: commits.deletes as u64, 812 + }, 813 + commits.dids_estimate, 814 + ); 808 815 batch.insert( 809 816 &self.rollups, 810 817 &live_counts_key.to_db_bytes()?,
+39 -23
ufos/src/store_types.rs
··· 209 209 ) 210 210 } 211 211 } 212 - #[derive(Debug, PartialEq, Decode, Encode)] 213 - pub struct TotalRecordsValue(pub u64); 214 - impl UseBincodePlz for TotalRecordsValue {} 215 212 216 - #[derive(Debug, PartialEq, serde::Serialize, serde::Deserialize)] 213 + #[derive(Debug, Clone, Copy, Default, PartialEq, Decode, Encode)] 214 + pub struct CommitCounts { 215 + pub creates: u64, 216 + pub updates: u64, 217 + pub deletes: u64, 218 + } 219 + impl CommitCounts { 220 + pub fn merge(&mut self, other: &Self) { 221 + self.creates += other.creates; 222 + self.updates += other.updates; 223 + self.deletes += other.deletes; 224 + } 225 + } 226 + impl UseBincodePlz for CommitCounts {} 227 + 228 + #[derive(Debug, Default, PartialEq, serde::Serialize, serde::Deserialize)] 217 229 pub struct EstimatedDidsValue(pub Sketch<14>); 218 230 impl SerdeBytes for EstimatedDidsValue {} 219 231 impl DbBytes for EstimatedDidsValue { ··· 236 248 } 237 249 } 238 250 239 - pub type CountsValue = DbConcat<TotalRecordsValue, EstimatedDidsValue>; 251 + pub type CountsValue = DbConcat<CommitCounts, EstimatedDidsValue>; 240 252 impl CountsValue { 241 - pub fn new(total: u64, dids: Sketch<14>) -> Self { 253 + pub fn new(counts: CommitCounts, dids: Sketch<14>) -> Self { 242 254 Self { 243 - prefix: TotalRecordsValue(total), 255 + prefix: counts, 244 256 suffix: EstimatedDidsValue(dids), 245 257 } 246 258 } 247 - pub fn records(&self) -> u64 { 248 - self.prefix.0 259 + pub fn counts(&self) -> CommitCounts { 260 + self.prefix 249 261 } 250 262 pub fn dids(&self) -> &Sketch<14> { 251 263 &self.suffix.0 252 264 } 253 265 pub fn merge(&mut self, other: &Self) { 254 - self.prefix.0 += other.records(); 255 - self.suffix.0.merge(other.dids()); 256 - } 257 - } 258 - impl Default for CountsValue { 259 - fn default() -> Self { 260 - Self { 261 - prefix: TotalRecordsValue(0), 262 - suffix: EstimatedDidsValue(Sketch::<14>::default()), 263 - } 266 + self.prefix.merge(&other.prefix); 267 + self.suffix.0.merge(&other.suffix.0); 264 268 } 265 269 } 266 270 impl From<&CountsValue> for JustCount { 267 271 fn from(cv: &CountsValue) -> Self { 268 272 Self { 269 - records: cv.records(), 273 + creates: cv.counts().creates, 270 274 dids_estimate: cv.dids().estimate() as u64, 271 275 } 272 276 } ··· 608 612 #[cfg(test)] 609 613 mod test { 610 614 use super::{ 611 - CountsValue, Cursor, CursorBucket, Did, EncodingError, HourTruncatedCursor, 615 + CommitCounts, CountsValue, Cursor, CursorBucket, Did, EncodingError, HourTruncatedCursor, 612 616 HourlyRollupKey, Nsid, Sketch, HOUR_IN_MICROS, WEEK_IN_MICROS, 613 617 }; 614 618 use crate::db_types::DbBytes; ··· 642 646 Did::new(format!("did:plc:inze6wrmsm7pjl7yta3oig7{i}")).unwrap(), 643 647 )); 644 648 } 645 - let original = CountsValue::new(123, estimator.clone()); 649 + let original = CountsValue::new( 650 + CommitCounts { 651 + creates: 123, 652 + ..Default::default() 653 + }, 654 + estimator.clone(), 655 + ); 646 656 let serialized = original.to_db_bytes()?; 647 657 let (restored, bytes_consumed) = CountsValue::from_db_bytes(&serialized)?; 648 658 assert_eq!(restored, original); ··· 653 663 Did::new(format!("did:plc:inze6wrmsm7pjl7yta3oig{i}")).unwrap(), 654 664 )); 655 665 } 656 - let original = CountsValue::new(123, estimator); 666 + let original = CountsValue::new( 667 + CommitCounts { 668 + creates: 123, 669 + ..Default::default() 670 + }, 671 + estimator, 672 + ); 657 673 let serialized = original.to_db_bytes()?; 658 674 let (restored, bytes_consumed) = CountsValue::from_db_bytes(&serialized)?; 659 675 assert_eq!(restored, original);