A library for ATProtocol identities.

refactor: adding minimal streaming parser for tap event id fields.

Changed files
+129 -3
crates
atproto-tap
+1 -1
Cargo.toml
··· 63 63 secrecy = { version = "0.10", features = ["serde"] } 64 64 serde = { version = "1.0", features = ["derive"] } 65 65 serde_ipld_dagcbor = "0.6" 66 - serde_json = "1.0" 66 + serde_json = { version = "1.0", features = ["unbounded_depth"] } 67 67 sha2 = "0.10" 68 68 thiserror = "2.0" 69 69 tokio = { version = "1.41", features = ["macros", "rt", "rt-multi-thread"] }
+112
crates/atproto-tap/src/events.rs
··· 7 7 //! - `serde_json::Value` for record payloads (allows lazy access) 8 8 9 9 use compact_str::CompactString; 10 + use serde::de::{self, Deserializer, IgnoredAny, MapAccess, Visitor}; 10 11 use serde::{Deserialize, Serialize, de::DeserializeOwned}; 12 + use std::fmt; 11 13 12 14 /// A TAP event received from the stream. 13 15 /// ··· 40 42 TapEvent::Record { id, .. } => *id, 41 43 TapEvent::Identity { id, .. } => *id, 42 44 } 45 + } 46 + } 47 + 48 + /// Extract only the event ID from a JSON string without fully parsing it. 49 + /// 50 + /// This is a fallback parser used when full `TapEvent` parsing fails (e.g., due to 51 + /// deeply nested records hitting serde_json's recursion limit). It uses `IgnoredAny` 52 + /// to efficiently skip over nested content without building data structures, allowing 53 + /// us to extract the ID for acknowledgment even when full parsing fails. 54 + /// 55 + /// # Example 56 + /// 57 + /// ``` 58 + /// use atproto_tap::extract_event_id; 59 + /// 60 + /// let json = r#"{"type":"record","id":12345,"record":{"deeply":"nested"}}"#; 61 + /// assert_eq!(extract_event_id(json), Some(12345)); 62 + /// ``` 63 + pub fn extract_event_id(json: &str) -> Option<u64> { 64 + let mut deserializer = serde_json::Deserializer::from_str(json); 65 + deserializer.disable_recursion_limit(); 66 + EventIdOnly::deserialize(&mut deserializer).ok().map(|e| e.id) 67 + } 68 + 69 + /// Internal struct for extracting only the "id" field from a TAP event. 70 + #[derive(Debug)] 71 + struct EventIdOnly { 72 + id: u64, 73 + } 74 + 75 + impl<'de> Deserialize<'de> for EventIdOnly { 76 + fn deserialize<D>(deserializer: D) -> Result<Self, D::Error> 77 + where 78 + D: Deserializer<'de>, 79 + { 80 + deserializer.deserialize_map(EventIdOnlyVisitor) 81 + } 82 + } 83 + 84 + struct EventIdOnlyVisitor; 85 + 86 + impl<'de> Visitor<'de> for EventIdOnlyVisitor { 87 + type Value = EventIdOnly; 88 + 89 + fn expecting(&self, formatter: &mut fmt::Formatter) -> fmt::Result { 90 + formatter.write_str("a map with an 'id' field") 91 + } 92 + 93 + fn visit_map<M>(self, mut map: M) -> Result<Self::Value, M::Error> 94 + where 95 + M: MapAccess<'de>, 96 + { 97 + let mut id: Option<u64> = None; 98 + 99 + while let Some(key) = map.next_key::<&str>()? { 100 + if key == "id" { 101 + id = Some(map.next_value()?); 102 + // Found what we need - skip the rest efficiently using IgnoredAny 103 + // which handles deeply nested structures without recursion issues 104 + while map.next_entry::<IgnoredAny, IgnoredAny>()?.is_some() {} 105 + break; 106 + } else { 107 + // Skip this value without fully parsing it 108 + map.next_value::<IgnoredAny>()?; 109 + } 110 + } 111 + 112 + id.map(|id| EventIdOnly { id }) 113 + .ok_or_else(|| de::Error::missing_field("id")) 43 114 } 44 115 } 45 116 ··· 372 443 }, 373 444 }; 374 445 assert_eq!(identity_event.id(), 200); 446 + } 447 + 448 + #[test] 449 + fn test_extract_event_id_simple() { 450 + let json = r#"{"type":"record","id":12345,"record":{"deeply":"nested"}}"#; 451 + assert_eq!(extract_event_id(json), Some(12345)); 452 + } 453 + 454 + #[test] 455 + fn test_extract_event_id_at_end() { 456 + let json = r#"{"type":"record","record":{"deeply":"nested"},"id":99999}"#; 457 + assert_eq!(extract_event_id(json), Some(99999)); 458 + } 459 + 460 + #[test] 461 + fn test_extract_event_id_missing() { 462 + let json = r#"{"type":"record","record":{"deeply":"nested"}}"#; 463 + assert_eq!(extract_event_id(json), None); 464 + } 465 + 466 + #[test] 467 + fn test_extract_event_id_invalid_json() { 468 + let json = r#"{"type":"record","id":123"#; // Truncated JSON 469 + assert_eq!(extract_event_id(json), None); 470 + } 471 + 472 + #[test] 473 + fn test_extract_event_id_deeply_nested() { 474 + // Create a deeply nested JSON that would exceed serde_json's default recursion limit 475 + let mut json = String::from(r#"{"id":42,"record":{"nested":"#); 476 + for _ in 0..200 { 477 + json.push_str("["); 478 + } 479 + json.push_str("1"); 480 + for _ in 0..200 { 481 + json.push_str("]"); 482 + } 483 + json.push_str("}}"); 484 + 485 + // extract_event_id should still work because it uses IgnoredAny with disabled recursion limit 486 + assert_eq!(extract_event_id(&json), Some(42)); 375 487 } 376 488 }
+1 -1
crates/atproto-tap/src/lib.rs
··· 115 115 pub use client::RepoStatus; 116 116 pub use config::{TapConfig, TapConfigBuilder}; 117 117 pub use errors::TapError; 118 - pub use events::{IdentityEvent, IdentityStatus, RecordAction, RecordEvent, TapEvent}; 118 + pub use events::{IdentityEvent, IdentityStatus, RecordAction, RecordEvent, TapEvent, extract_event_id}; 119 119 pub use stream::{TapStream, connect, connect_to};
+15 -1
crates/atproto-tap/src/stream.rs
··· 15 15 use crate::config::TapConfig; 16 16 use crate::connection::TapConnection; 17 17 use crate::errors::TapError; 18 - use crate::events::TapEvent; 18 + use crate::events::{TapEvent, extract_event_id}; 19 19 use futures::Stream; 20 20 use std::pin::Pin; 21 21 use std::sync::Arc; ··· 187 187 Err(err) => { 188 188 // Parse errors don't affect connection 189 189 tracing::warn!(error = %err, "Failed to parse TAP message"); 190 + 191 + // Try to extract just the ID using fallback parser 192 + // so we can still ack the message even if full parsing fails 193 + if config.send_acks { 194 + if let Some(event_id) = extract_event_id(&msg) { 195 + tracing::debug!(event_id, "Extracted event ID via fallback parser"); 196 + if let Err(ack_err) = conn.send_ack(event_id).await { 197 + tracing::warn!(error = %ack_err, "Failed to send ack for unparseable message"); 198 + } 199 + } else { 200 + tracing::warn!("Could not extract event ID from unparseable message"); 201 + } 202 + } 203 + 190 204 if event_tx.send(Err(TapError::ParseError(err.to_string()))).await.is_err() { 191 205 tracing::debug!("Event receiver dropped, closing connection"); 192 206 let _ = conn.close().await;