Scalable and distributed custom feed generator, ott - on that topic
1use std::str::FromStr;
2use std::sync::OnceLock;
3
4use eyre::eyre;
5use fluvio_smartmodule::dataplane::smartmodule::{SmartModuleExtraParams, SmartModuleInitError};
6use fluvio_smartmodule::{RecordData, Result, SmartModuleRecord, smartmodule};
7use serde_json::Value;
8
9static KEY: OnceLock<String> = OnceLock::new();
10static DELETE: OnceLock<bool> = OnceLock::new();
11
12#[smartmodule(map)]
13pub fn map(record: &SmartModuleRecord) -> Result<(Option<RecordData>, RecordData)> {
14 let string = std::str::from_utf8(record.value.as_ref())?;
15 let mut value = Value::from_str(string)?;
16 let obj = value
17 .as_object_mut()
18 .ok_or(eyre!("Failed to parse value"))?;
19
20 let field_key = KEY.get().expect("Invalid state");
21 let record_key = if DELETE.get().is_some() {
22 obj.remove(field_key)
23 .ok_or(eyre!(format!("Key missing in record")))?
24 .to_string()
25 } else {
26 obj.get(field_key)
27 .ok_or(eyre!("Field missing in record"))?
28 .to_string()
29 };
30
31 Ok((Some(record_key.into()), value.to_string().as_str().into()))
32}
33
34#[smartmodule(init)]
35fn init(params: SmartModuleExtraParams) -> Result<()> {
36 if params.get("delete").is_some() {
37 DELETE
38 .set(true)
39 .map_err(|_| eyre!("Failed to set input param"))?;
40 }
41
42 if let Some(key) = params.get("key") {
43 KEY.set(key.clone())
44 .map_err(|err| eyre!("failed setting key: {:#?}", err))
45 } else {
46 Err(SmartModuleInitError::MissingParam("key".to_string()).into())
47 }
48}