this repo has no description
at main 3.1 kB view raw
1use std::collections::HashSet; 2 3use anyhow::{anyhow, Result}; 4use chrono::Duration; 5use serde::Deserialize; 6use tokio::time::{sleep, Instant}; 7use tokio_util::sync::CancellationToken; 8 9use crate::storage::{verifcation_method_insert, verification_method_cleanup, StoragePool}; 10 11#[derive(Deserialize)] 12struct VerificationMethod { 13 #[serde(rename = "publicKeyMultibase")] 14 public_key_multibase: String, 15} 16 17#[derive(Deserialize)] 18struct ResolvedPlcDid { 19 id: String, 20 #[serde(rename = "verificationMethod")] 21 verification_method: Vec<VerificationMethod>, 22} 23 24pub struct VerificationMethodCacheTask { 25 pool: StoragePool, 26 http_client: reqwest::Client, 27 plc_hostname: String, 28 dids: HashSet<String>, 29 cancellation_token: CancellationToken, 30} 31 32impl VerificationMethodCacheTask { 33 pub fn new( 34 pool: StoragePool, 35 http_client: reqwest::Client, 36 plc_hostname: String, 37 dids: HashSet<String>, 38 cancellation_token: CancellationToken, 39 ) -> Self { 40 Self { 41 pool, 42 http_client, 43 plc_hostname, 44 dids, 45 cancellation_token, 46 } 47 } 48 49 pub async fn run_background(&self, interval: Duration) -> Result<()> { 50 let interval = interval.to_std()?; 51 52 let sleeper = sleep(interval); 53 tokio::pin!(sleeper); 54 55 loop { 56 tokio::select! { 57 () = self.cancellation_token.cancelled() => { 58 break; 59 }, 60 () = &mut sleeper => { 61 62 if let Err(err) = self.main().await { 63 tracing::error!("StatsTask task failed: {}", err); 64 } 65 66 67 sleeper.as_mut().reset(Instant::now() + interval); 68 } 69 } 70 } 71 Ok(()) 72 } 73 74 pub async fn main(&self) -> Result<()> { 75 for did in &self.dids { 76 let query_response = self.plc_query(did).await; 77 if let Err(err) = query_response { 78 tracing::error!(error = ?err, "Failed to query PLC for DID: {}", did); 79 continue; 80 } 81 let key = query_response.unwrap(); 82 83 verifcation_method_insert(&self.pool, did, &key).await?; 84 } 85 86 verification_method_cleanup(&self.pool).await?; 87 Ok(()) 88 } 89 90 async fn plc_query(&self, did: &str) -> Result<String> { 91 let url = if let Some(hostname) = did.strip_prefix("did:web:") { 92 format!("https://{}/.well-known/did.json", hostname) 93 } else { 94 format!("https://{}/{}", self.plc_hostname, did) 95 }; 96 97 let resolved_did: ResolvedPlcDid = self 98 .http_client 99 .get(url) 100 .timeout(Duration::seconds(10).to_std()?) 101 .send() 102 .await? 103 .json() 104 .await?; 105 106 if resolved_did.id != did { 107 return Err(anyhow!("DID mismatch")); 108 } 109 110 let key = resolved_did 111 .verification_method 112 .first() 113 .map(|value| value.public_key_multibase.clone()); 114 115 key.ok_or(anyhow!("No key found")) 116 } 117}