//! Cursor-based pagination utilities. //! //! Cursors encode the position in a result set as base64(field1|field2|...|cid) //! to enable stable pagination even when new records are inserted. //! //! The cursor format: //! - All sort field values are included in the cursor //! - Values are separated by pipe (|) characters //! - CID is always the last element as the ultimate tiebreaker use super::types::SortField; use crate::models::Record; use base64::{Engine as _, engine::general_purpose}; /// Generates a cursor from a record based on the sort configuration. /// /// Extracts all sort field values from the record and encodes them along with the CID. /// Format: `base64(field1_value|field2_value|...|cid)` /// /// # Arguments /// * `record` - The record to generate a cursor for /// * `sort_by` - Optional array defining sort fields /// /// # Returns /// Base64-encoded cursor string pub fn generate_cursor_from_record(record: &Record, sort_by: Option<&Vec>) -> String { let mut cursor_parts = Vec::new(); // Extract values for all sort fields if let Some(sort_fields) = sort_by { for sort_field in sort_fields { let field_value = extract_field_value(record, &sort_field.field); cursor_parts.push(field_value); } } // Always add CID as the final tiebreaker cursor_parts.push(record.cid.clone()); // Join with pipe and encode let cursor_content = cursor_parts.join("|"); general_purpose::URL_SAFE_NO_PAD.encode(cursor_content) } /// Extracts a field value from a record. /// /// Handles both table columns and JSON fields with nested paths. fn extract_field_value(record: &Record, field: &str) -> String { match field { "indexed_at" => record.indexed_at.to_rfc3339(), "uri" => record.uri.clone(), "cid" => record.cid.clone(), "did" => record.did.clone(), "collection" => record.collection.clone(), _ => { // Handle nested JSON paths let field_path: Vec<&str> = field.split('.').collect(); let mut value = &record.json; for key in &field_path { value = match value.get(key) { Some(v) => v, None => return "NULL".to_string(), }; } match value { serde_json::Value::String(s) => s.clone(), serde_json::Value::Number(n) => n.to_string(), serde_json::Value::Bool(b) => b.to_string(), serde_json::Value::Null => "NULL".to_string(), _ => "NULL".to_string(), } } } } /// Decoded cursor components for pagination. #[derive(Debug, Clone)] pub struct DecodedCursor { /// Field values in the order they appear in sortBy pub field_values: Vec, /// CID (always the last element) pub cid: String, } /// Decodes a base64-encoded cursor back into its components. /// /// The cursor format is: `base64(field1|field2|...|cid)` /// /// # Arguments /// * `cursor` - Base64-encoded cursor string /// * `sort_by` - Optional array of sort fields to validate cursor format /// /// # Returns /// Result containing DecodedCursor or error if decoding fails pub fn decode_cursor(cursor: &str, sort_by: Option<&Vec>) -> Result { let decoded_bytes = general_purpose::URL_SAFE_NO_PAD.decode(cursor) .map_err(|e| format!("Failed to decode base64: {}", e))?; let decoded_str = String::from_utf8(decoded_bytes) .map_err(|e| format!("Invalid UTF-8 in cursor: {}", e))?; let parts: Vec<&str> = decoded_str.split('|').collect(); // Validate cursor format matches sortBy fields let expected_parts = if let Some(fields) = sort_by { fields.len() + 1 // sort fields + CID } else { 1 // just CID if no sortBy }; if parts.len() != expected_parts { return Err(format!( "Invalid cursor format: expected {} parts, got {}", expected_parts, parts.len() )); } let cid = parts[parts.len() - 1].to_string(); let field_values: Vec = parts[..parts.len() - 1] .iter() .map(|s| s.to_string()) .collect(); Ok(DecodedCursor { field_values, cid, }) } /// Builds cursor-based WHERE conditions for proper multi-field pagination. /// /// Creates progressive equality checks for stable multi-field sorting. /// For each field, we OR together: /// 1. field1 > cursor_value1 /// 2. field1 = cursor_value1 AND field2 > cursor_value2 /// 3. field1 = cursor_value1 AND field2 = cursor_value2 AND field3 > cursor_value3 /// ... and so on /// /// Finally: all fields equal AND cid > cursor_cid /// /// # Arguments /// * `decoded_cursor` - The decoded cursor components /// * `sort_by` - Optional array of sort fields /// * `param_count` - Mutable counter for parameter numbering /// * `field_types` - Optional array indicating if each field is a datetime /// /// # Returns /// Tuple of (where_condition_sql, bind_values) pub fn build_cursor_where_condition( decoded_cursor: &DecodedCursor, sort_by: Option<&Vec>, param_count: &mut usize, field_types: Option<&[bool]>, ) -> (String, Vec) { let mut bind_values = Vec::new(); let mut clauses = Vec::new(); let sort_fields = match sort_by { Some(fields) if !fields.is_empty() => fields, _ => { // No sort fields, shouldn't happen but handle gracefully return ("1=1".to_string(), vec![]); } }; // Build progressive equality checks for each level for i in 0..sort_fields.len() { let mut clause_parts = Vec::new(); // Add equality checks for all previous fields for (j, sort_field) in sort_fields.iter().enumerate().take(i) { let field = &sort_field.field; let cursor_value = &decoded_cursor.field_values[j]; let is_datetime = field_types.and_then(|types| types.get(j).copied()).unwrap_or(false); let field_ref = build_field_reference(field, is_datetime); let param_cast = if is_datetime { "::timestamp" } else { "" }; clause_parts.push(format!("{} = ${}{}", field_ref, param_count, param_cast)); *param_count += 1; bind_values.push(cursor_value.clone()); } // Add comparison for current field let field = &sort_fields[i].field; let cursor_value = &decoded_cursor.field_values[i]; let direction = &sort_fields[i].direction; let is_datetime = field_types.and_then(|types| types.get(i).copied()).unwrap_or(false); let comparison_op = if direction.to_lowercase() == "desc" { "<" } else { ">" }; let field_ref = build_field_reference(field, is_datetime); let param_cast = if is_datetime { "::timestamp" } else { "" }; clause_parts.push(format!("{} {} ${}{}", field_ref, comparison_op, param_count, param_cast)); *param_count += 1; bind_values.push(cursor_value.clone()); // Combine with AND clauses.push(format!("({})", clause_parts.join(" AND "))); } // Add final clause: all fields equal AND cid comparison let mut final_clause_parts = Vec::new(); for (j, field) in sort_fields.iter().enumerate() { let cursor_value = &decoded_cursor.field_values[j]; let is_datetime = field_types.and_then(|types| types.get(j).copied()).unwrap_or(false); let field_ref = build_field_reference(&field.field, is_datetime); let param_cast = if is_datetime { "::timestamp" } else { "" }; final_clause_parts.push(format!("{} = ${}{}", field_ref, param_count, param_cast)); *param_count += 1; bind_values.push(cursor_value.clone()); } // CID comparison uses the direction of the last sort field let last_direction = &sort_fields[sort_fields.len() - 1].direction; let cid_comparison_op = if last_direction.to_lowercase() == "desc" { "<" } else { ">" }; final_clause_parts.push(format!("cid {} ${}", cid_comparison_op, param_count)); *param_count += 1; bind_values.push(decoded_cursor.cid.clone()); clauses.push(format!("({})", final_clause_parts.join(" AND "))); // Combine all clauses with OR let where_condition = format!("({})", clauses.join(" OR ")); (where_condition, bind_values) } /// Builds a field reference for SQL queries. /// /// Handles table columns, JSON fields, and nested paths with optional timestamp casting. pub(super) fn build_field_reference(field: &str, is_datetime: bool) -> String { // Table columns don't need JSON extraction if matches!(field, "uri" | "cid" | "did" | "collection" | "indexed_at") { return field.to_string(); } // Build JSON path for nested or simple fields let json_path = if field.contains('.') { let parts: Vec<&str> = field.split('.').collect(); let mut path = String::from("json"); for (i, part) in parts.iter().enumerate() { if i == parts.len() - 1 { path.push_str(&format!("->>'{}'", part)); } else { path.push_str(&format!("->'{}'", part)); } } path } else { format!("json->>'{}'", field) }; // Add timestamp cast if needed if is_datetime { format!("({})::timestamp", json_path) } else { json_path } } #[cfg(test)] mod tests { use super::*; use chrono::Utc; fn create_test_record() -> Record { Record { uri: "at://did:plc:test/app.bsky.feed.post/123".to_string(), cid: "bafytest123".to_string(), did: "did:plc:test".to_string(), collection: "app.bsky.feed.post".to_string(), json: serde_json::json!({ "text": "Hello world", "createdAt": "2025-01-15T12:00:00Z", "nested": { "field": "value" } }), indexed_at: Utc::now(), slice_uri: Some("at://did:plc:slice/network.slices.slice/abc".to_string()), } } #[test] fn test_generate_cursor_no_sort() { let record = create_test_record(); let cursor = generate_cursor_from_record(&record, None); let decoded = general_purpose::URL_SAFE_NO_PAD.decode(&cursor).unwrap(); let decoded_str = String::from_utf8(decoded).unwrap(); assert_eq!(decoded_str, "bafytest123"); } #[test] fn test_generate_cursor_with_sort() { let record = create_test_record(); let sort_by = vec![ SortField { field: "text".to_string(), direction: "desc".to_string(), }, ]; let cursor = generate_cursor_from_record(&record, Some(&sort_by)); let decoded = general_purpose::URL_SAFE_NO_PAD.decode(&cursor).unwrap(); let decoded_str = String::from_utf8(decoded).unwrap(); assert_eq!(decoded_str, "Hello world|bafytest123"); } #[test] fn test_decode_cursor_single_field() { let sort_by = vec![ SortField { field: "createdAt".to_string(), direction: "desc".to_string(), }, ]; let cursor_content = "2025-01-15T12:00:00Z|bafytest123"; let cursor = general_purpose::URL_SAFE_NO_PAD.encode(cursor_content); let decoded = decode_cursor(&cursor, Some(&sort_by)).unwrap(); assert_eq!(decoded.field_values, vec!["2025-01-15T12:00:00Z"]); assert_eq!(decoded.cid, "bafytest123"); } #[test] fn test_decode_cursor_multiple_fields() { let sort_by = vec![ SortField { field: "text".to_string(), direction: "desc".to_string(), }, SortField { field: "createdAt".to_string(), direction: "desc".to_string(), }, ]; let cursor_content = "Hello world|2025-01-15T12:00:00Z|bafytest123"; let cursor = general_purpose::URL_SAFE_NO_PAD.encode(cursor_content); let decoded = decode_cursor(&cursor, Some(&sort_by)).unwrap(); assert_eq!(decoded.field_values, vec!["Hello world", "2025-01-15T12:00:00Z"]); assert_eq!(decoded.cid, "bafytest123"); } #[test] fn test_decode_cursor_invalid_format() { let sort_by = vec![ SortField { field: "text".to_string(), direction: "desc".to_string(), }, ]; let cursor_content = "bafytest123"; let cursor = general_purpose::URL_SAFE_NO_PAD.encode(cursor_content); let result = decode_cursor(&cursor, Some(&sort_by)); assert!(result.is_err()); } #[test] fn test_build_field_reference_table_column() { assert_eq!(build_field_reference("uri", false), "uri"); assert_eq!(build_field_reference("cid", false), "cid"); assert_eq!(build_field_reference("indexed_at", false), "indexed_at"); } #[test] fn test_build_field_reference_json_field() { assert_eq!(build_field_reference("text", false), "json->>'text'"); assert_eq!( build_field_reference("text", true), "(json->>'text')::timestamp" ); } #[test] fn test_build_field_reference_nested_json() { assert_eq!( build_field_reference("nested.field", false), "json->'nested'->>'field'" ); assert_eq!( build_field_reference("nested.field", true), "(json->'nested'->>'field')::timestamp" ); } #[test] fn test_extract_field_value_table_columns() { let record = create_test_record(); assert_eq!(extract_field_value(&record, "uri"), record.uri); assert_eq!(extract_field_value(&record, "cid"), record.cid); assert_eq!(extract_field_value(&record, "did"), record.did); assert_eq!(extract_field_value(&record, "collection"), record.collection); } #[test] fn test_extract_field_value_json() { let record = create_test_record(); assert_eq!(extract_field_value(&record, "text"), "Hello world"); assert_eq!(extract_field_value(&record, "createdAt"), "2025-01-15T12:00:00Z"); } #[test] fn test_extract_field_value_nested_json() { let record = create_test_record(); assert_eq!(extract_field_value(&record, "nested.field"), "value"); } #[test] fn test_extract_field_value_missing() { let record = create_test_record(); assert_eq!(extract_field_value(&record, "nonexistent"), "NULL"); assert_eq!(extract_field_value(&record, "nested.nonexistent"), "NULL"); } }