Rust AppView - highly experimental!

init: parakeet-consumer

Changed files
+687
parakeet-consumer
+51
parakeet-consumer/Cargo.toml
··· 1 + [package] 2 + name = "parakeet-consumer" 3 + version = "0.1.0" 4 + edition = "2021" 5 + 6 + [dependencies] 7 + # Workspace dependencies 8 + parakeet-db = { path = "../parakeet-db" } 9 + 10 + # AT Protocol 11 + jacquard-api = { workspace = true } 12 + jacquard-common = { workspace = true } 13 + 14 + # Async runtime 15 + tokio = { workspace = true, features = ["full"] } 16 + async-trait = { workspace = true } 17 + 18 + # WebSocket 19 + tokio-tungstenite = { version = "0.21", features = ["native-tls"] } 20 + 21 + # Database 22 + diesel = { workspace = true, features = ["postgres", "chrono", "uuid", "serde_json"] } 23 + diesel-async = { workspace = true, features = ["deadpool", "postgres"] } 24 + deadpool-diesel = { workspace = true, features = ["postgres"] } 25 + 26 + # Serialization 27 + serde = { workspace = true, features = ["derive"] } 28 + serde_json = { workspace = true } 29 + 30 + # Time handling 31 + chrono = { workspace = true, features = ["serde"] } 32 + 33 + # Caching 34 + moka = { version = "0.12", features = ["future"] } 35 + 36 + # Error handling 37 + thiserror = { workspace = true } 38 + 39 + # Logging 40 + tracing = { workspace = true } 41 + 42 + # Utilities 43 + futures = { workspace = true } 44 + # bytes = { workspace = true } 45 + # uuid = { workspace = true, features = ["v4", "serde"] } 46 + 47 + [dev-dependencies] 48 + tokio-test = "0.4" 49 + mockall = "0.12" 50 + proptest = "1.4" 51 + tracing-subscriber = "0.3"
+32
parakeet-consumer/src/core/actor_store.rs
··· 1 + use crate::core::{ActorBackend, StorageError}; 2 + use moka::future::Cache; 3 + use std::sync::Arc; 4 + use std::time::Duration; 5 + 6 + pub struct ActorIdStore<B: ActorBackend> { 7 + backend: Arc<B>, 8 + cache: Cache<String, i64>, 9 + } 10 + 11 + impl<B: ActorBackend> ActorIdStore<B> { 12 + pub fn new(backend: Arc<B>, cache_capacity: u64, cache_ttl: Duration) -> Self { 13 + Self { 14 + backend, 15 + cache: Cache::builder() 16 + .max_capacity(cache_capacity) 17 + .time_to_live(cache_ttl) 18 + .build(), 19 + } 20 + } 21 + pub async fn get(&self, did: &str) -> Result<i64, StorageError> { 22 + if let Some(id) = self.cache.get(did).await { 23 + return Ok(id); 24 + } 25 + let id = self.backend.get_actor_id(did).await?; 26 + self.cache.insert(did.to_string(), id).await; 27 + Ok(id) 28 + } 29 + pub async fn clear_cache(&self) { 30 + self.cache.invalidate_all(); 31 + } 32 + }
+18
parakeet-consumer/src/core/event_source.rs
··· 1 + use async_trait::async_trait; 2 + use std::fmt::Debug; 3 + 4 + pub trait Event: Send + Sync + Debug { 5 + fn id(&self) -> i64; 6 + fn event_type(&self) -> &str; 7 + } 8 + 9 + #[async_trait] 10 + pub trait EventSource: Send + Sync { 11 + type Event: Event; 12 + type Error: std::error::Error + Send + Sync + 'static; 13 + 14 + async fn connect(&mut self) -> Result<(), Self::Error>; 15 + async fn next_event(&mut self) -> Result<Self::Event, Self::Error>; 16 + fn is_connected(&self) -> bool; 17 + async fn disconnect(&mut self) -> Result<(), Self::Error>; 18 + }
+55
parakeet-consumer/src/core/macros.rs
··· 1 + #[macro_export] 2 + macro_rules! define_record { 3 + ( 4 + struct_name: $name:ident, 5 + field_name: $field:ident, 6 + field_type: $field_type:ty, 7 + deserialize_type: $deser_type:ty, 8 + db_method: $db_method:ident 9 + ) => { 10 + #[derive(Debug, Clone)] 11 + pub struct $name<'a> { 12 + pub $field: $field_type, 13 + pub actor_id: i64, 14 + pub cid: String, 15 + pub uri: String, 16 + } 17 + 18 + impl $crate::sources::tap::processor::FromTapRecord for $name<'static> { 19 + fn from_tap_record( 20 + record: &$crate::sources::tap::types::TapRecord, 21 + actor_id: i64, 22 + ) -> Result<Self, $crate::core::StorageError> { 23 + let val = record 24 + .record 25 + .as_ref() 26 + .ok_or_else(|| $crate::core::StorageError::Parse("Missing record".into()))?; 27 + let json_str = serde_json::to_string(val) 28 + .map_err(|e| $crate::core::StorageError::Parse(e.to_string()))?; 29 + let $field = serde_json::from_str::<$deser_type>(&json_str) 30 + .map_err(|e| $crate::core::StorageError::Parse(e.to_string()))? 31 + .into_static(); 32 + Ok($name { 33 + $field, 34 + actor_id, 35 + cid: record 36 + .cid 37 + .as_ref() 38 + .ok_or_else(|| $crate::core::StorageError::Parse("Missing CID".into()))? 39 + .clone(), 40 + uri: format!("at://{}/{}/{}", record.did, record.collection, record.rkey), 41 + }) 42 + } 43 + } 44 + 45 + #[async_trait::async_trait] 46 + impl $crate::sources::tap::processor::DatabaseWritable for $name<'static> { 47 + async fn write_to_db<DB: $crate::core::StorageBackend + ?Sized>( 48 + &self, 49 + db: &DB, 50 + ) -> Result<(), $crate::core::StorageError> { 51 + db.$db_method(self).await 52 + } 53 + } 54 + }; 55 + }
+38
parakeet-consumer/src/core/storage.rs
··· 1 + use crate::records::{Follow, Like, Post, Profile, Repost}; 2 + use async_trait::async_trait; 3 + 4 + #[derive(Debug, thiserror::Error)] 5 + pub enum StorageError { 6 + #[error("Parse error: {0}")] 7 + Parse(String), 8 + 9 + #[error("Connection error: {0}")] 10 + Connection(String), 11 + 12 + #[error("Query error: {0}")] 13 + Query(String), 14 + 15 + #[error("Transaction error: {0}")] 16 + Transaction(String), 17 + 18 + #[error("Not found")] 19 + NotFound, 20 + 21 + #[error("Constraint violation: {0}")] 22 + ConstraintViolation(String), 23 + } 24 + 25 + #[async_trait] 26 + pub trait StorageBackend: Send + Sync { 27 + async fn upsert_post(&self, post: &Post<'static>) -> Result<(), StorageError>; 28 + async fn upsert_profile(&self, profile: &Profile<'static>) -> Result<(), StorageError>; 29 + async fn create_follow(&self, follow: &Follow<'static>) -> Result<(), StorageError>; 30 + async fn create_like(&self, like: &Like<'static>) -> Result<(), StorageError>; 31 + async fn create_repost(&self, repost: &Repost<'static>) -> Result<(), StorageError>; 32 + async fn delete_record(&self, uri: &str) -> Result<(), StorageError>; 33 + } 34 + 35 + #[async_trait] 36 + pub trait ActorBackend: Send + Sync { 37 + async fn get_actor_id(&self, did: &str) -> Result<i64, StorageError>; 38 + }
+44
parakeet-consumer/src/lib.rs
··· 1 + pub mod core { 2 + pub mod actor_store; 3 + pub mod event_source; 4 + pub mod macros; 5 + pub mod storage; 6 + 7 + pub use event_source::{Event, EventSource}; 8 + pub use storage::{ActorBackend, StorageBackend, StorageError}; 9 + } 10 + 11 + pub mod sources { 12 + pub mod tap { 13 + pub mod client; 14 + pub mod processor; 15 + pub mod types; 16 + 17 + pub use client::{ReconnectingTapClient, TapClient}; 18 + pub use processor::{spawn_worker, Dispatcher, EventProcessor}; 19 + pub use types::{ 20 + IdentityData, LabelData, RepoStatus, TapAction, TapError, TapEvent, TapRecord, 21 + }; 22 + } 23 + } 24 + 25 + pub mod records { 26 + pub mod app_bsky; 27 + 28 + pub use app_bsky::{Follow, Like, Post, Profile, Repost}; 29 + } 30 + 31 + pub mod storage { 32 + // pub mod postgres; 33 + 34 + // pub use postgres::PostgresBackend; 35 + } 36 + 37 + pub use core::actor_store::ActorIdStore; 38 + pub use core::{ActorBackend, Event, EventSource, StorageBackend, StorageError}; 39 + pub use records::{Follow, Like, Post, Profile, Repost}; 40 + 41 + pub use sources::tap::{ 42 + spawn_worker, Dispatcher, EventProcessor, ReconnectingTapClient, TapAction, TapClient, 43 + TapError, TapEvent, TapRecord, 44 + };
+43
parakeet-consumer/src/records/app_bsky.rs
··· 1 + use crate::define_record; 2 + use jacquard_api::app_bsky; 3 + use jacquard_common::IntoStatic; 4 + 5 + define_record!( 6 + struct_name: Post, 7 + field_name: post, 8 + field_type: app_bsky::feed::post::Post<'a>, 9 + deserialize_type: app_bsky::feed::post::Post<'_>, 10 + db_method: upsert_post 11 + ); 12 + 13 + define_record!( 14 + struct_name: Profile, 15 + field_name: profile, 16 + field_type: app_bsky::actor::profile::Profile<'a>, 17 + deserialize_type: app_bsky::actor::profile::Profile<'_>, 18 + db_method: upsert_profile 19 + ); 20 + 21 + define_record!( 22 + struct_name: Follow, 23 + field_name: follow, 24 + field_type: app_bsky::graph::follow::Follow<'a>, 25 + deserialize_type: app_bsky::graph::follow::Follow<'_>, 26 + db_method: create_follow 27 + ); 28 + 29 + define_record!( 30 + struct_name: Like, 31 + field_name: like, 32 + field_type: app_bsky::feed::like::Like<'a>, 33 + deserialize_type: app_bsky::feed::like::Like<'_>, 34 + db_method: create_like 35 + ); 36 + 37 + define_record!( 38 + struct_name: Repost, 39 + field_name: repost, 40 + field_type: app_bsky::feed::repost::Repost<'a>, 41 + deserialize_type: app_bsky::feed::repost::Repost<'_>, 42 + db_method: create_repost 43 + );
+166
parakeet-consumer/src/sources/tap/client.rs
··· 1 + use super::types::{TapError, TapEvent}; 2 + use crate::core::{Event, EventSource}; 3 + use async_trait::async_trait; 4 + use futures::{SinkExt, StreamExt}; 5 + use std::time::Duration; 6 + use tokio::net::TcpStream; 7 + use tokio_tungstenite::tungstenite::Message; 8 + use tokio_tungstenite::{connect_async, MaybeTlsStream, WebSocketStream}; 9 + 10 + pub struct TapClient { 11 + url: String, 12 + ws: Option<WebSocketStream<MaybeTlsStream<TcpStream>>>, 13 + } 14 + 15 + impl TapClient { 16 + pub fn new(url: impl Into<String>) -> Self { 17 + let mut url = url.into(); 18 + url = url 19 + .replacen("http://", "ws://", 1) 20 + .replacen("https://", "wss://", 1); 21 + if !url.ends_with("/channel") { 22 + if !url.ends_with('/') { 23 + url.push('/'); 24 + } 25 + url.push_str("channel"); 26 + } 27 + Self { url, ws: None } 28 + } 29 + 30 + async fn send_ack(&mut self, event_id: i64) -> Result<(), TapError> { 31 + match &mut self.ws { 32 + Some(ws) => ws 33 + .send(Message::Text(format!( 34 + r#"{{"type":"ack","id":{}}}"#, 35 + event_id 36 + ))) 37 + .await 38 + .map_err(|e| TapError::Connection(e.to_string())), 39 + None => Err(TapError::Disconnected), 40 + } 41 + } 42 + } 43 + 44 + #[async_trait] 45 + impl EventSource for TapClient { 46 + type Event = TapEvent; 47 + type Error = TapError; 48 + 49 + async fn connect(&mut self) -> Result<(), TapError> { 50 + self.ws = Some( 51 + connect_async(&self.url) 52 + .await 53 + .map_err(|e| TapError::Connection(e.to_string()))? 54 + .0, 55 + ); 56 + Ok(()) 57 + } 58 + 59 + async fn next_event(&mut self) -> Result<TapEvent, TapError> { 60 + loop { 61 + let ws = self.ws.as_mut().ok_or(TapError::Disconnected)?; 62 + match ws.next().await { 63 + Some(Ok(Message::Text(text))) => { 64 + if let Ok(event) = serde_json::from_str::<TapEvent>(&text) { 65 + let _ = self.send_ack(event.id()).await; 66 + return Ok(event); 67 + } 68 + } 69 + Some(Ok(Message::Ping(data))) => { 70 + ws.send(Message::Pong(data)) 71 + .await 72 + .map_err(|e| TapError::Connection(e.to_string()))?; 73 + } 74 + Some(Ok(Message::Close(_))) | None => { 75 + self.ws = None; 76 + return Err(TapError::Disconnected); 77 + } 78 + Some(Err(e)) => { 79 + self.ws = None; 80 + return Err(TapError::Connection(e.to_string())); 81 + } 82 + _ => {} 83 + } 84 + } 85 + } 86 + 87 + fn is_connected(&self) -> bool { 88 + self.ws.is_some() 89 + } 90 + async fn disconnect(&mut self) -> Result<(), TapError> { 91 + self.ws.take(); 92 + Ok(()) 93 + } 94 + } 95 + 96 + pub struct ReconnectingTapClient { 97 + url: String, 98 + inner: Option<TapClient>, 99 + max_retries: usize, 100 + reconnect_delay: Duration, 101 + } 102 + 103 + impl ReconnectingTapClient { 104 + pub fn new(url: impl Into<String>) -> Self { 105 + let url = url.into(); 106 + Self { 107 + url: url.clone(), 108 + inner: Some(TapClient::new(url)), 109 + max_retries: 10, 110 + reconnect_delay: Duration::from_secs(1), 111 + } 112 + } 113 + async fn ensure_connected(&mut self) -> Result<(), TapError> { 114 + if self.inner.as_ref().is_some_and(|c| c.is_connected()) { 115 + return Ok(()); 116 + } 117 + let (mut retries, mut delay) = (0, self.reconnect_delay); 118 + while retries < self.max_retries { 119 + let mut client = TapClient::new(self.url.clone()); 120 + if client.connect().await.is_ok() { 121 + self.inner = Some(client); 122 + return Ok(()); 123 + } 124 + retries += 1; 125 + if retries < self.max_retries { 126 + tokio::time::sleep(delay).await; 127 + delay = (delay * 2).min(Duration::from_secs(60)); 128 + } 129 + } 130 + Err(TapError::Connection(format!( 131 + "Failed after {} attempts", 132 + self.max_retries 133 + ))) 134 + } 135 + } 136 + 137 + #[async_trait] 138 + impl EventSource for ReconnectingTapClient { 139 + type Event = TapEvent; 140 + type Error = TapError; 141 + async fn connect(&mut self) -> Result<(), TapError> { 142 + self.ensure_connected().await 143 + } 144 + async fn next_event(&mut self) -> Result<TapEvent, TapError> { 145 + loop { 146 + self.ensure_connected().await?; 147 + if let Some(client) = &mut self.inner { 148 + match client.next_event().await { 149 + Ok(event) => return Ok(event), 150 + Err(TapError::Disconnected) => self.inner = None, 151 + Err(e) => return Err(e), 152 + } 153 + } 154 + } 155 + } 156 + fn is_connected(&self) -> bool { 157 + self.inner.as_ref().is_some_and(|c| c.is_connected()) 158 + } 159 + async fn disconnect(&mut self) -> Result<(), TapError> { 160 + if let Some(mut client) = self.inner.take() { 161 + client.disconnect().await 162 + } else { 163 + Ok(()) 164 + } 165 + } 166 + }
+149
parakeet-consumer/src/sources/tap/processor.rs
··· 1 + use super::types::{TapAction, TapEvent, TapRecord}; 2 + use crate::core::actor_store::ActorIdStore; 3 + use crate::core::{ActorBackend, Event, StorageBackend, StorageError}; 4 + use crate::records::{Follow, Like, Post, Profile, Repost}; 5 + use async_trait::async_trait; 6 + use std::sync::Arc; 7 + use tokio::sync::mpsc; 8 + use tokio::task::JoinHandle; 9 + use tracing::{error, info}; 10 + 11 + pub trait FromTapRecord: Sized { 12 + fn from_tap_record(record: &TapRecord, actor_id: i64) -> Result<Self, StorageError>; 13 + } 14 + 15 + #[async_trait] 16 + pub trait DatabaseWritable { 17 + async fn write_to_db<DB: StorageBackend + ?Sized>(&self, db: &DB) -> Result<(), StorageError>; 18 + } 19 + 20 + const COLLECTIONS: &[&str] = &[ 21 + "app.bsky.feed.post", 22 + "app.bsky.actor.profile", 23 + "app.bsky.graph.follow", 24 + "app.bsky.feed.like", 25 + "app.bsky.feed.repost", 26 + ]; 27 + 28 + macro_rules! process_collection { 29 + ($event:expr, $db:expr, $store:expr) => { 30 + match $event { 31 + TapEvent::Record { record, .. } => match record.collection.as_str() { 32 + "app.bsky.feed.post" => process_record::<Post, _>($event, $db, $store).await, 33 + "app.bsky.actor.profile" => process_record::<Profile, _>($event, $db, $store).await, 34 + "app.bsky.graph.follow" => process_record::<Follow, _>($event, $db, $store).await, 35 + "app.bsky.feed.like" => process_record::<Like, _>($event, $db, $store).await, 36 + "app.bsky.feed.repost" => process_record::<Repost, _>($event, $db, $store).await, 37 + _ => Ok(()), 38 + }, 39 + _ => Ok(()), 40 + } 41 + }; 42 + } 43 + 44 + pub fn spawn_worker<AB: ActorBackend + 'static>( 45 + collection: &'static str, 46 + mut rx: mpsc::Receiver<TapEvent>, 47 + db: Arc<dyn StorageBackend>, 48 + actor_store: Arc<ActorIdStore<AB>>, 49 + ) -> JoinHandle<()> { 50 + tokio::spawn(async move { 51 + info!("Worker started for collection: {}", collection); 52 + while let Some(event) = rx.recv().await { 53 + if let Err(e) = process_collection!(&event, db.as_ref(), &actor_store) { 54 + error!( 55 + "Failed to process {} event {}: {}", 56 + collection, 57 + event.id(), 58 + e 59 + ); 60 + } 61 + } 62 + info!("Worker stopped for collection: {}", collection); 63 + }) 64 + } 65 + 66 + async fn process_record<T, AB>( 67 + event: &TapEvent, 68 + db: &dyn StorageBackend, 69 + actor_store: &Arc<ActorIdStore<AB>>, 70 + ) -> Result<(), StorageError> 71 + where 72 + T: FromTapRecord + DatabaseWritable + Send + 'static, 73 + AB: ActorBackend, 74 + { 75 + let TapEvent::Record { record, .. } = event else { 76 + return Ok(()); 77 + }; 78 + match record.action { 79 + TapAction::Create | TapAction::Update => { 80 + T::from_tap_record(record, actor_store.get(&record.did).await?)? 81 + .write_to_db(db) 82 + .await 83 + } 84 + TapAction::Delete => { 85 + db.delete_record(&format!( 86 + "at://{}/{}/{}", 87 + record.did, record.collection, record.rkey 88 + )) 89 + .await 90 + } 91 + } 92 + } 93 + 94 + pub struct Dispatcher { 95 + channels: std::collections::HashMap<String, mpsc::Sender<TapEvent>>, 96 + workers: Vec<JoinHandle<()>>, 97 + } 98 + 99 + impl Dispatcher { 100 + pub fn new<AB: ActorBackend + 'static>( 101 + db: Arc<dyn StorageBackend>, 102 + actor_store: Arc<ActorIdStore<AB>>, 103 + channel_size: usize, 104 + ) -> Self { 105 + let (channels, workers) = COLLECTIONS.iter().fold( 106 + (std::collections::HashMap::new(), Vec::new()), 107 + |(mut ch, mut w), c| { 108 + let (tx, rx) = mpsc::channel(channel_size); 109 + ch.insert(c.to_string(), tx); 110 + w.push(spawn_worker(c, rx, db.clone(), actor_store.clone())); 111 + (ch, w) 112 + }, 113 + ); 114 + Self { channels, workers } 115 + } 116 + 117 + pub async fn dispatch(&self, event: TapEvent) -> Result<(), StorageError> { 118 + let TapEvent::Record { ref record, .. } = event else { 119 + return Ok(()); 120 + }; 121 + if let Some(tx) = self.channels.get(&record.collection) { 122 + tx.send(event) 123 + .await 124 + .map_err(|_| StorageError::Query("Worker channel closed".into()))?; 125 + } 126 + Ok(()) 127 + } 128 + 129 + pub async fn shutdown(self) { 130 + drop(self.channels); 131 + for worker in self.workers { 132 + let _ = worker.await; 133 + } 134 + } 135 + } 136 + 137 + pub struct EventProcessor<DB: StorageBackend, AB: ActorBackend> { 138 + db: Arc<DB>, 139 + actor_store: Arc<ActorIdStore<AB>>, 140 + } 141 + 142 + impl<DB: StorageBackend, AB: ActorBackend> EventProcessor<DB, AB> { 143 + pub fn new(db: Arc<DB>, actor_store: Arc<ActorIdStore<AB>>) -> Self { 144 + Self { db, actor_store } 145 + } 146 + pub async fn process_event(&self, event: &TapEvent) -> Result<(), StorageError> { 147 + process_collection!(event, self.db.as_ref(), &self.actor_store) 148 + } 149 + }
+91
parakeet-consumer/src/sources/tap/types.rs
··· 1 + use crate::core::Event; 2 + use serde::{Deserialize, Serialize}; 3 + 4 + #[derive(Debug, Clone, Deserialize, Serialize)] 5 + #[serde(tag = "type", rename_all = "lowercase")] 6 + pub enum TapEvent { 7 + Record { id: i64, record: TapRecord }, 8 + Identity { id: i64, identity: IdentityData }, 9 + Label { id: i64, label: LabelData }, 10 + } 11 + 12 + impl Event for TapEvent { 13 + fn id(&self) -> i64 { 14 + match self { 15 + TapEvent::Record { id, .. } 16 + | TapEvent::Identity { id, .. } 17 + | TapEvent::Label { id, .. } => *id, 18 + } 19 + } 20 + fn event_type(&self) -> &str { 21 + match self { 22 + TapEvent::Record { .. } => "record", 23 + TapEvent::Identity { .. } => "identity", 24 + TapEvent::Label { .. } => "label", 25 + } 26 + } 27 + } 28 + 29 + #[derive(Debug, Clone, Deserialize, Serialize)] 30 + pub struct TapRecord { 31 + pub live: bool, 32 + pub rev: String, 33 + pub did: String, 34 + pub collection: String, 35 + pub rkey: String, 36 + pub action: TapAction, 37 + pub cid: Option<String>, 38 + pub record: Option<serde_json::Value>, 39 + } 40 + 41 + #[derive(Debug, Clone, Deserialize, Serialize)] 42 + pub struct IdentityData { 43 + pub did: String, 44 + pub handle: String, 45 + #[serde(rename = "isActive")] 46 + pub is_active: bool, 47 + pub status: RepoStatus, 48 + } 49 + 50 + #[derive(Debug, Clone, Deserialize, Serialize)] 51 + pub struct LabelData { 52 + pub live: bool, 53 + #[serde(rename = "labelerDID")] 54 + pub labeler_did: String, 55 + pub uri: String, 56 + pub val: String, 57 + pub cts: String, 58 + pub src: String, 59 + pub cid: Option<String>, 60 + pub neg: bool, 61 + } 62 + 63 + #[derive(Debug, Clone, Deserialize, Serialize)] 64 + #[serde(rename_all = "lowercase")] 65 + pub enum TapAction { 66 + Create, 67 + Update, 68 + Delete, 69 + } 70 + 71 + #[derive(Debug, Clone, Deserialize, Serialize)] 72 + #[serde(rename_all = "lowercase")] 73 + pub enum RepoStatus { 74 + Active, 75 + Takendown, 76 + Suspended, 77 + Deactivated, 78 + Deleted, 79 + } 80 + 81 + #[derive(Debug, thiserror::Error)] 82 + pub enum TapError { 83 + #[error("Connection error: {0}")] 84 + Connection(String), 85 + #[error("Disconnected from tap")] 86 + Disconnected, 87 + #[error("Deserialization error: {0}")] 88 + Deserialization(String), 89 + #[error("IO error: {0}")] 90 + Io(#[from] std::io::Error), 91 + }