Rust implementation of the CVM algorithm for counting distinct elements in a stream
at main 10 kB view raw
1//! A randomized binary search tree (treap) implementation 2//! 3//! A treap maintains both BST property (for keys) and heap property (for priorities). 4//! 5//! This implementation was inspired by the treap exploration in <https://github.com/apanda/cvm> 6//! (BSD-2-Clause license), but is an independent implementation tailored specifically 7//! for the CVM algorithm's requirements. 8//! 9//! ## Key Differences from apanda/cvm treap: 10//! 11//! 1. **Simpler structure**: We don't use a separate Element type; keys and priorities are 12//! stored directly in nodes 13//! 2. **Random priorities**: apanda's implementation expects explicit priorities, while ours 14//! generates random priorities at insertion time 15//! 3. **No allocation tracking**: apanda uses `alloc_counter` for performance analysis 16//! 4. **Simplified delete**: Our delete returns a bool, apanda's has more complex handling 17//! 5. **Retain operation**: We added a specialized `retain` method for CVM's "clear half" 18//! 6. **No Display trait**: We focus on the minimal API needed for CVM 19//! 7. **Insert behavior**: apanda's `insert_or_replace` updates existing elements; ours 20//! keeps the original (no update) which is what CVM needs 21//! 22//! ## Design Decisions 23//! 24//! Unlike general-purpose treap implementations, this one is optimized for CVM: 25//! - No key-value mapping: CVM only needs to track unique elements 26//! - Simplified API: Only operations needed for CVM are implemented 27//! - Efficient `retain`: Optimized for the "clear about half" operation 28//! - RNG integration: Accepts an external RNG for consistent randomness 29 30use rand::Rng; 31use std::cmp::Ordering; 32 33/// A node in the treap 34struct Node<T> { 35 key: T, 36 priority: u32, 37 left: Option<Box<Node<T>>>, 38 right: Option<Box<Node<T>>>, 39} 40 41impl<T> Node<T> { 42 fn new(key: T, priority: u32) -> Self { 43 Node { 44 key, 45 priority, 46 left: None, 47 right: None, 48 } 49 } 50} 51 52/// A treap data structure 53/// 54/// Key differences from typical treap implementations: 55/// 1. Priorities are generated at insertion time using the provided RNG 56/// 2. The `retain` operation is optimized for the CVM algorithm's "clear half" operation 57/// 3. No support for key-value pairs - only keys are stored (values are implicit) 58/// 4. No split operation as it's not needed for CVM 59/// 5. Insert doesn't update existing keys - matching CVM's requirement 60pub struct Treap<T> { 61 root: Option<Box<Node<T>>>, 62 size: usize, 63} 64 65impl<T: Ord> Treap<T> { 66 /// Create a new empty treap 67 pub fn new() -> Self { 68 Treap { 69 root: None, 70 size: 0, 71 } 72 } 73 74 /// Get the number of elements in the treap 75 pub fn len(&self) -> usize { 76 self.size 77 } 78 79 /// Check if the treap is empty 80 #[allow(dead_code)] 81 pub fn is_empty(&self) -> bool { 82 self.size == 0 83 } 84 85 /// Insert a key with a random priority 86 pub fn insert<R: Rng>(&mut self, key: T, rng: &mut R) { 87 let priority = rng.random(); 88 self.root = Self::insert_node(self.root.take(), key, priority); 89 self.size += 1; 90 } 91 92 /// Check if the treap contains a key 93 pub fn contains(&self, key: &T) -> bool { 94 Self::contains_node(&self.root, key) 95 } 96 97 /// Remove a key from the treap 98 pub fn remove(&mut self, key: &T) -> bool { 99 let (new_root, removed) = Self::remove_node(self.root.take(), key); 100 self.root = new_root; 101 if removed { 102 self.size -= 1; 103 } 104 removed 105 } 106 107 /// Clear the treap 108 #[allow(dead_code)] 109 pub fn clear(&mut self) { 110 self.root = None; 111 self.size = 0; 112 } 113 114 /// Apply a function to each element, removing those for which it returns false 115 pub fn retain<F>(&mut self, mut f: F) 116 where 117 F: FnMut(&T) -> bool, 118 { 119 let (new_root, new_size) = Self::retain_node(self.root.take(), &mut f); 120 self.root = new_root; 121 self.size = new_size; 122 } 123 124 // Helper function to insert a node 125 fn insert_node(node: Option<Box<Node<T>>>, key: T, priority: u32) -> Option<Box<Node<T>>> { 126 match node { 127 None => Some(Box::new(Node::new(key, priority))), 128 Some(mut n) => { 129 match key.cmp(&n.key) { 130 Ordering::Less => { 131 n.left = Self::insert_node(n.left, key, priority); 132 // Maintain heap property 133 if n.left.as_ref().unwrap().priority > n.priority { 134 Self::rotate_right(n) 135 } else { 136 Some(n) 137 } 138 } 139 Ordering::Greater => { 140 n.right = Self::insert_node(n.right, key, priority); 141 // Maintain heap property 142 if n.right.as_ref().unwrap().priority > n.priority { 143 Self::rotate_left(n) 144 } else { 145 Some(n) 146 } 147 } 148 Ordering::Equal => Some(n), // Key already exists, do nothing 149 } 150 } 151 } 152 } 153 154 // Helper function to check if a node contains a key 155 fn contains_node(node: &Option<Box<Node<T>>>, key: &T) -> bool { 156 match node { 157 None => false, 158 Some(n) => match key.cmp(&n.key) { 159 Ordering::Less => Self::contains_node(&n.left, key), 160 Ordering::Greater => Self::contains_node(&n.right, key), 161 Ordering::Equal => true, 162 }, 163 } 164 } 165 166 // Helper function to remove a node 167 fn remove_node(node: Option<Box<Node<T>>>, key: &T) -> (Option<Box<Node<T>>>, bool) { 168 match node { 169 None => (None, false), 170 Some(mut n) => match key.cmp(&n.key) { 171 Ordering::Less => { 172 let (new_left, removed) = Self::remove_node(n.left, key); 173 n.left = new_left; 174 (Some(n), removed) 175 } 176 Ordering::Greater => { 177 let (new_right, removed) = Self::remove_node(n.right, key); 178 n.right = new_right; 179 (Some(n), removed) 180 } 181 Ordering::Equal => { 182 // Found the node to remove 183 (Self::merge(n.left, n.right), true) 184 } 185 }, 186 } 187 } 188 189 // Merge two subtrees 190 fn merge(left: Option<Box<Node<T>>>, right: Option<Box<Node<T>>>) -> Option<Box<Node<T>>> { 191 match (left, right) { 192 (None, right) => right, 193 (left, None) => left, 194 (Some(l), Some(r)) => { 195 if l.priority > r.priority { 196 let mut l = l; 197 l.right = Self::merge(l.right, Some(r)); 198 Some(l) 199 } else { 200 let mut r = r; 201 r.left = Self::merge(Some(l), r.left); 202 Some(r) 203 } 204 } 205 } 206 } 207 208 // Rotate right 209 fn rotate_right(mut node: Box<Node<T>>) -> Option<Box<Node<T>>> { 210 let mut new_root = node.left.take().unwrap(); 211 node.left = new_root.right.take(); 212 new_root.right = Some(node); 213 Some(new_root) 214 } 215 216 // Rotate left 217 fn rotate_left(mut node: Box<Node<T>>) -> Option<Box<Node<T>>> { 218 let mut new_root = node.right.take().unwrap(); 219 node.right = new_root.left.take(); 220 new_root.left = Some(node); 221 Some(new_root) 222 } 223 224 // Retain nodes that satisfy the predicate 225 fn retain_node<F>(node: Option<Box<Node<T>>>, f: &mut F) -> (Option<Box<Node<T>>>, usize) 226 where 227 F: FnMut(&T) -> bool, 228 { 229 match node { 230 None => (None, 0), 231 Some(mut n) => { 232 let (new_left, left_size) = Self::retain_node(n.left, f); 233 let (new_right, right_size) = Self::retain_node(n.right, f); 234 235 if f(&n.key) { 236 n.left = new_left; 237 n.right = new_right; 238 (Some(n), left_size + right_size + 1) 239 } else { 240 // Remove this node by merging its subtrees 241 let merged = Self::merge(new_left, new_right); 242 (merged, left_size + right_size) 243 } 244 } 245 } 246 } 247} 248 249impl<T: Ord> Default for Treap<T> { 250 fn default() -> Self { 251 Self::new() 252 } 253} 254 255#[cfg(test)] 256mod tests { 257 use super::*; 258 use rand::SeedableRng; 259 use rand::rngs::StdRng; 260 261 #[test] 262 fn test_insert_and_contains() { 263 let mut treap = Treap::new(); 264 let mut rng = StdRng::seed_from_u64(42); 265 266 treap.insert(5, &mut rng); 267 treap.insert(3, &mut rng); 268 treap.insert(7, &mut rng); 269 270 assert!(treap.contains(&5)); 271 assert!(treap.contains(&3)); 272 assert!(treap.contains(&7)); 273 assert!(!treap.contains(&1)); 274 assert_eq!(treap.len(), 3); 275 } 276 277 #[test] 278 fn test_remove() { 279 let mut treap = Treap::new(); 280 let mut rng = StdRng::seed_from_u64(42); 281 282 treap.insert(5, &mut rng); 283 treap.insert(3, &mut rng); 284 treap.insert(7, &mut rng); 285 286 assert!(treap.remove(&3)); 287 assert!(!treap.contains(&3)); 288 assert_eq!(treap.len(), 2); 289 290 assert!(!treap.remove(&3)); // Already removed 291 } 292 293 #[test] 294 fn test_retain() { 295 let mut treap = Treap::new(); 296 let mut rng = StdRng::seed_from_u64(42); 297 298 for i in 0..10 { 299 treap.insert(i, &mut rng); 300 } 301 302 treap.retain(|&x| x % 2 == 0); 303 assert_eq!(treap.len(), 5); 304 305 for i in 0..10 { 306 if i % 2 == 0 { 307 assert!(treap.contains(&i)); 308 } else { 309 assert!(!treap.contains(&i)); 310 } 311 } 312 } 313}