don't
5
fork

Configure Feed

Select the types of activity you want to include in your feed.

refactor(jetstream,knot): restructure jetstream types

Signed-off-by: tjh <did:plc:65gha4t3avpfpzmvpbwovss7>

tjh.dev 7bf5ec4a 6b3da20c

verified
+91 -24
+71 -7
crates/jetstream/src/types.rs
··· 15 15 #[derive(Debug, Deserialize)] 16 16 #[serde(from = "ShadowEventPayload<T>")] 17 17 pub enum EventPayload<'a, T> { 18 - #[serde(borrow)] 19 - Commit(Commit<'a, T>), 18 + CommitCreate(#[serde(borrow)] CommitMeta<'a>, CommitData<'a, T>), 19 + CommitUpdate(#[serde(borrow)] CommitMeta<'a>, CommitData<'a, T>), 20 + CommitDelete(#[serde(borrow)] CommitMeta<'a>), 20 21 Account(Account<'a>), 21 22 Identity(Identity<'a>), 22 23 } ··· 43 42 impl<'a, T> From<ShadowEventPayload<'a, T>> for EventPayload<'a, T> { 44 43 fn from(value: ShadowEventPayload<'a, T>) -> Self { 45 44 match value { 46 - ShadowEventPayload::Commit { commit } => EventPayload::Commit(commit), 45 + ShadowEventPayload::Commit { 46 + commit: 47 + Commit::Create { 48 + collection, 49 + rkey, 50 + rev, 51 + cid, 52 + record, 53 + }, 54 + } => EventPayload::CommitCreate( 55 + CommitMeta { 56 + collection, 57 + rkey, 58 + rev, 59 + }, 60 + CommitData { cid, record }, 61 + ), 62 + ShadowEventPayload::Commit { 63 + commit: 64 + Commit::Update { 65 + collection, 66 + rkey, 67 + rev, 68 + cid, 69 + record, 70 + }, 71 + } => EventPayload::CommitCreate( 72 + CommitMeta { 73 + collection, 74 + rkey, 75 + rev, 76 + }, 77 + CommitData { cid, record }, 78 + ), 79 + ShadowEventPayload::Commit { 80 + commit: 81 + Commit::Delete { 82 + collection, 83 + rkey, 84 + rev, 85 + }, 86 + } => EventPayload::CommitDelete(CommitMeta { 87 + collection, 88 + rkey, 89 + rev, 90 + }), 47 91 ShadowEventPayload::Account { account } => EventPayload::Account(account), 48 92 ShadowEventPayload::Identity { identity } => EventPayload::Identity(identity), 49 93 } ··· 99 53 #[serde(rename_all = "snake_case")] 100 54 #[serde(tag = "operation")] 101 55 pub enum Commit<'a, T> { 102 - Create(#[serde(borrow)] CommitData<'a, T>), 103 - Update(#[serde(borrow)] CommitData<'a, T>), 56 + Create { 57 + #[serde(borrow)] 58 + collection: &'a Nsid, 59 + rkey: &'a str, 60 + rev: &'a str, 61 + cid: &'a str, 62 + record: T, 63 + }, 64 + Update { 65 + #[serde(borrow)] 66 + collection: &'a Nsid, 67 + rkey: &'a str, 68 + rev: &'a str, 69 + cid: &'a str, 70 + record: T, 71 + }, 104 72 Delete { 105 73 #[serde(borrow)] 106 74 collection: &'a Nsid, ··· 123 63 }, 124 64 } 125 65 126 - #[derive(Debug, Deserialize)] 127 - pub struct CommitData<'a, T> { 66 + #[derive(Debug, Hash, PartialEq, Eq, Deserialize)] 67 + pub struct CommitMeta<'a> { 128 68 #[serde(borrow)] 129 69 pub collection: &'a Nsid, 130 70 pub rkey: &'a str, 131 71 pub rev: &'a str, 72 + } 73 + 74 + #[derive(Debug, Deserialize)] 75 + pub struct CommitData<'a, T> { 132 76 pub cid: &'a str, 133 77 pub record: T, 134 78 }
+20 -17
crates/knot/src/main.rs
··· 1 + mod cli; 2 + 1 3 use axum::{ 2 4 Router, 3 5 extract::{Query, WebSocketUpgrade}, ··· 31 29 use tracing::{Span, field::Empty}; 32 30 use tracing_subscriber::{EnvFilter, layer::SubscriberExt as _, util::SubscriberInitExt as _}; 33 31 use url::Url; 34 - 35 - mod cli; 36 32 37 33 fn main() { 38 34 let stderr_layer = tracing_subscriber::fmt::layer().with_writer(std::io::stderr); ··· 204 204 { 205 205 let knot = knot.clone(); 206 206 tokio::task::spawn(async move { 207 - use jetstream::types::{Commit, EventPayload}; 207 + use jetstream::types::EventPayload; 208 208 209 209 while let Some(event) = jetstream_rx.recv_async().await { 210 - let Ok(event) = event.deserialize() else { 211 - tracing::error!("failed to deserialize event"); 212 - continue; 210 + let event = match event.deserialize() { 211 + Ok(event) => event, 212 + Err(error) => { 213 + tracing::error!(?error, "failed to deserialize event"); 214 + continue; 215 + } 213 216 }; 214 217 215 - tracing::debug!(?event); 218 + tracing::trace!(?event); 216 219 217 - if let EventPayload::Commit(Commit::Create(create)) = event.payload { 218 - tracing::info!("{create:?}"); 219 - if let Lexicon::PublicKey(public_key) = create.record { 220 - tracing::info!(?public_key); 221 - if let Ok(pk) = oauth::public_key::PublicKey::from_openssh(&public_key.key) 222 - { 223 - tracing::info!("adding public key to cache"); 224 - knot.insert_public_keys(event.did.to_owned(), [pk].into_iter()) 225 - .await; 226 - } 220 + match event.payload { 221 + EventPayload::CommitCreate(meta, commit) 222 + | EventPayload::CommitUpdate(meta, commit) => { 223 + tracing::info!(?meta, ?commit, "jetstream create/update"); 224 + } 225 + EventPayload::CommitDelete(meta) => { 226 + tracing::info!(?meta, "jetstream delete"); 227 + } 228 + EventPayload::Account(_) | EventPayload::Identity(_) => { 229 + knot.resolver().invalidate_did(event.did).await; 227 230 } 228 231 } 229 232 }