Scalable and distributed custom feed generator, ott - on that topic
at main 48 lines 1.5 kB view raw
1use std::str::FromStr; 2use std::sync::OnceLock; 3 4use eyre::eyre; 5use fluvio_smartmodule::dataplane::smartmodule::{SmartModuleExtraParams, SmartModuleInitError}; 6use fluvio_smartmodule::{RecordData, Result, SmartModuleRecord, smartmodule}; 7use serde_json::Value; 8 9static KEY: OnceLock<String> = OnceLock::new(); 10static DELETE: OnceLock<bool> = OnceLock::new(); 11 12#[smartmodule(map)] 13pub fn map(record: &SmartModuleRecord) -> Result<(Option<RecordData>, RecordData)> { 14 let string = std::str::from_utf8(record.value.as_ref())?; 15 let mut value = Value::from_str(string)?; 16 let obj = value 17 .as_object_mut() 18 .ok_or(eyre!("Failed to parse value"))?; 19 20 let field_key = KEY.get().expect("Invalid state"); 21 let record_key = if DELETE.get().is_some() { 22 obj.remove(field_key) 23 .ok_or(eyre!(format!("Key missing in record")))? 24 .to_string() 25 } else { 26 obj.get(field_key) 27 .ok_or(eyre!("Field missing in record"))? 28 .to_string() 29 }; 30 31 Ok((Some(record_key.into()), value.to_string().as_str().into())) 32} 33 34#[smartmodule(init)] 35fn init(params: SmartModuleExtraParams) -> Result<()> { 36 if params.get("delete").is_some() { 37 DELETE 38 .set(true) 39 .map_err(|_| eyre!("Failed to set input param"))?; 40 } 41 42 if let Some(key) = params.get("key") { 43 KEY.set(key.clone()) 44 .map_err(|err| eyre!("failed setting key: {:#?}", err)) 45 } else { 46 Err(SmartModuleInitError::MissingParam("key".to_string()).into()) 47 } 48}