Rust implementation of the CVM algorithm for counting distinct elements in a stream
1//! A randomized binary search tree (treap) implementation
2//!
3//! A treap maintains both BST property (for keys) and heap property (for priorities).
4//!
5//! This implementation was inspired by the treap exploration in <https://github.com/apanda/cvm>
6//! (BSD-2-Clause license), but is an independent implementation tailored specifically
7//! for the CVM algorithm's requirements.
8//!
9//! ## Key Differences from apanda/cvm treap:
10//!
11//! 1. **Simpler structure**: We don't use a separate Element type; keys and priorities are
12//! stored directly in nodes
13//! 2. **Random priorities**: apanda's implementation expects explicit priorities, while ours
14//! generates random priorities at insertion time
15//! 3. **No allocation tracking**: apanda uses `alloc_counter` for performance analysis
16//! 4. **Simplified delete**: Our delete returns a bool, apanda's has more complex handling
17//! 5. **Retain operation**: We added a specialized `retain` method for CVM's "clear half"
18//! 6. **No Display trait**: We focus on the minimal API needed for CVM
19//! 7. **Insert behavior**: apanda's `insert_or_replace` updates existing elements; ours
20//! keeps the original (no update) which is what CVM needs
21//!
22//! ## Design Decisions
23//!
24//! Unlike general-purpose treap implementations, this one is optimized for CVM:
25//! - No key-value mapping: CVM only needs to track unique elements
26//! - Simplified API: Only operations needed for CVM are implemented
27//! - Efficient `retain`: Optimized for the "clear about half" operation
28//! - RNG integration: Accepts an external RNG for consistent randomness
29
30use rand::Rng;
31use std::cmp::Ordering;
32
33/// A node in the treap
34struct Node<T> {
35 key: T,
36 priority: u32,
37 left: Option<Box<Node<T>>>,
38 right: Option<Box<Node<T>>>,
39}
40
41impl<T> Node<T> {
42 fn new(key: T, priority: u32) -> Self {
43 Node {
44 key,
45 priority,
46 left: None,
47 right: None,
48 }
49 }
50}
51
52/// A treap data structure
53///
54/// Key differences from typical treap implementations:
55/// 1. Priorities are generated at insertion time using the provided RNG
56/// 2. The `retain` operation is optimized for the CVM algorithm's "clear half" operation
57/// 3. No support for key-value pairs - only keys are stored (values are implicit)
58/// 4. No split operation as it's not needed for CVM
59/// 5. Insert doesn't update existing keys - matching CVM's requirement
60pub struct Treap<T> {
61 root: Option<Box<Node<T>>>,
62 size: usize,
63}
64
65impl<T: Ord> Treap<T> {
66 /// Create a new empty treap
67 pub fn new() -> Self {
68 Treap {
69 root: None,
70 size: 0,
71 }
72 }
73
74 /// Get the number of elements in the treap
75 pub fn len(&self) -> usize {
76 self.size
77 }
78
79 /// Check if the treap is empty
80 #[allow(dead_code)]
81 pub fn is_empty(&self) -> bool {
82 self.size == 0
83 }
84
85 /// Insert a key with a random priority
86 pub fn insert<R: Rng>(&mut self, key: T, rng: &mut R) {
87 let priority = rng.random();
88 self.root = Self::insert_node(self.root.take(), key, priority);
89 self.size += 1;
90 }
91
92 /// Check if the treap contains a key
93 pub fn contains(&self, key: &T) -> bool {
94 Self::contains_node(&self.root, key)
95 }
96
97 /// Remove a key from the treap
98 pub fn remove(&mut self, key: &T) -> bool {
99 let (new_root, removed) = Self::remove_node(self.root.take(), key);
100 self.root = new_root;
101 if removed {
102 self.size -= 1;
103 }
104 removed
105 }
106
107 /// Clear the treap
108 #[allow(dead_code)]
109 pub fn clear(&mut self) {
110 self.root = None;
111 self.size = 0;
112 }
113
114 /// Apply a function to each element, removing those for which it returns false
115 pub fn retain<F>(&mut self, mut f: F)
116 where
117 F: FnMut(&T) -> bool,
118 {
119 let (new_root, new_size) = Self::retain_node(self.root.take(), &mut f);
120 self.root = new_root;
121 self.size = new_size;
122 }
123
124 // Helper function to insert a node
125 fn insert_node(node: Option<Box<Node<T>>>, key: T, priority: u32) -> Option<Box<Node<T>>> {
126 match node {
127 None => Some(Box::new(Node::new(key, priority))),
128 Some(mut n) => {
129 match key.cmp(&n.key) {
130 Ordering::Less => {
131 n.left = Self::insert_node(n.left, key, priority);
132 // Maintain heap property
133 if n.left.as_ref().unwrap().priority > n.priority {
134 Self::rotate_right(n)
135 } else {
136 Some(n)
137 }
138 }
139 Ordering::Greater => {
140 n.right = Self::insert_node(n.right, key, priority);
141 // Maintain heap property
142 if n.right.as_ref().unwrap().priority > n.priority {
143 Self::rotate_left(n)
144 } else {
145 Some(n)
146 }
147 }
148 Ordering::Equal => Some(n), // Key already exists, do nothing
149 }
150 }
151 }
152 }
153
154 // Helper function to check if a node contains a key
155 fn contains_node(node: &Option<Box<Node<T>>>, key: &T) -> bool {
156 match node {
157 None => false,
158 Some(n) => match key.cmp(&n.key) {
159 Ordering::Less => Self::contains_node(&n.left, key),
160 Ordering::Greater => Self::contains_node(&n.right, key),
161 Ordering::Equal => true,
162 },
163 }
164 }
165
166 // Helper function to remove a node
167 fn remove_node(node: Option<Box<Node<T>>>, key: &T) -> (Option<Box<Node<T>>>, bool) {
168 match node {
169 None => (None, false),
170 Some(mut n) => match key.cmp(&n.key) {
171 Ordering::Less => {
172 let (new_left, removed) = Self::remove_node(n.left, key);
173 n.left = new_left;
174 (Some(n), removed)
175 }
176 Ordering::Greater => {
177 let (new_right, removed) = Self::remove_node(n.right, key);
178 n.right = new_right;
179 (Some(n), removed)
180 }
181 Ordering::Equal => {
182 // Found the node to remove
183 (Self::merge(n.left, n.right), true)
184 }
185 },
186 }
187 }
188
189 // Merge two subtrees
190 fn merge(left: Option<Box<Node<T>>>, right: Option<Box<Node<T>>>) -> Option<Box<Node<T>>> {
191 match (left, right) {
192 (None, right) => right,
193 (left, None) => left,
194 (Some(l), Some(r)) => {
195 if l.priority > r.priority {
196 let mut l = l;
197 l.right = Self::merge(l.right, Some(r));
198 Some(l)
199 } else {
200 let mut r = r;
201 r.left = Self::merge(Some(l), r.left);
202 Some(r)
203 }
204 }
205 }
206 }
207
208 // Rotate right
209 fn rotate_right(mut node: Box<Node<T>>) -> Option<Box<Node<T>>> {
210 let mut new_root = node.left.take().unwrap();
211 node.left = new_root.right.take();
212 new_root.right = Some(node);
213 Some(new_root)
214 }
215
216 // Rotate left
217 fn rotate_left(mut node: Box<Node<T>>) -> Option<Box<Node<T>>> {
218 let mut new_root = node.right.take().unwrap();
219 node.right = new_root.left.take();
220 new_root.left = Some(node);
221 Some(new_root)
222 }
223
224 // Retain nodes that satisfy the predicate
225 fn retain_node<F>(node: Option<Box<Node<T>>>, f: &mut F) -> (Option<Box<Node<T>>>, usize)
226 where
227 F: FnMut(&T) -> bool,
228 {
229 match node {
230 None => (None, 0),
231 Some(mut n) => {
232 let (new_left, left_size) = Self::retain_node(n.left, f);
233 let (new_right, right_size) = Self::retain_node(n.right, f);
234
235 if f(&n.key) {
236 n.left = new_left;
237 n.right = new_right;
238 (Some(n), left_size + right_size + 1)
239 } else {
240 // Remove this node by merging its subtrees
241 let merged = Self::merge(new_left, new_right);
242 (merged, left_size + right_size)
243 }
244 }
245 }
246 }
247}
248
249impl<T: Ord> Default for Treap<T> {
250 fn default() -> Self {
251 Self::new()
252 }
253}
254
255#[cfg(test)]
256mod tests {
257 use super::*;
258 use rand::SeedableRng;
259 use rand::rngs::StdRng;
260
261 #[test]
262 fn test_insert_and_contains() {
263 let mut treap = Treap::new();
264 let mut rng = StdRng::seed_from_u64(42);
265
266 treap.insert(5, &mut rng);
267 treap.insert(3, &mut rng);
268 treap.insert(7, &mut rng);
269
270 assert!(treap.contains(&5));
271 assert!(treap.contains(&3));
272 assert!(treap.contains(&7));
273 assert!(!treap.contains(&1));
274 assert_eq!(treap.len(), 3);
275 }
276
277 #[test]
278 fn test_remove() {
279 let mut treap = Treap::new();
280 let mut rng = StdRng::seed_from_u64(42);
281
282 treap.insert(5, &mut rng);
283 treap.insert(3, &mut rng);
284 treap.insert(7, &mut rng);
285
286 assert!(treap.remove(&3));
287 assert!(!treap.contains(&3));
288 assert_eq!(treap.len(), 2);
289
290 assert!(!treap.remove(&3)); // Already removed
291 }
292
293 #[test]
294 fn test_retain() {
295 let mut treap = Treap::new();
296 let mut rng = StdRng::seed_from_u64(42);
297
298 for i in 0..10 {
299 treap.insert(i, &mut rng);
300 }
301
302 treap.retain(|&x| x % 2 == 0);
303 assert_eq!(treap.len(), 5);
304
305 for i in 0..10 {
306 if i % 2 == 0 {
307 assert!(treap.contains(&i));
308 } else {
309 assert!(!treap.contains(&i));
310 }
311 }
312 }
313}