+4
smart-modules/assign-record-key/SmartModule.toml
+4
smart-modules/assign-record-key/SmartModule.toml
+21
-10
smart-modules/assign-record-key/src/lib.rs
+21
-10
smart-modules/assign-record-key/src/lib.rs
···
6
6
use fluvio_smartmodule::{RecordData, Result, SmartModuleRecord, smartmodule};
7
7
use serde_json::Value;
8
8
9
-
static CRITERIA: OnceLock<String> = OnceLock::new();
9
+
static KEY: OnceLock<String> = OnceLock::new();
10
+
static DELETE: OnceLock<bool> = OnceLock::new();
10
11
11
12
#[smartmodule(map)]
12
13
pub fn map(record: &SmartModuleRecord) -> Result<(Option<RecordData>, RecordData)> {
···
15
16
let obj = value
16
17
.as_object_mut()
17
18
.ok_or(eyre!("Failed to parse value"))?;
18
-
let key = obj
19
-
.remove(CRITERIA.get().ok_or(eyre!("Invalid state"))?)
20
-
.ok_or(eyre!("Key missing in record"))?;
19
+
20
+
let field_key = KEY.get().expect("Invalid state");
21
+
let record_key = if DELETE.get().is_some() {
22
+
obj.remove(field_key)
23
+
.ok_or(eyre!(format!("Key missing in record")))?
24
+
.to_string()
25
+
} else {
26
+
obj.get(field_key)
27
+
.ok_or(eyre!("Field missing in record"))?
28
+
.to_string()
29
+
};
21
30
22
-
Ok((
23
-
Some(key.to_string().into()),
24
-
value.to_string().as_str().into(),
25
-
))
31
+
Ok((Some(record_key.into()), value.to_string().as_str().into()))
26
32
}
27
33
28
34
#[smartmodule(init)]
29
35
fn init(params: SmartModuleExtraParams) -> Result<()> {
36
+
if params.get("delete").is_some() {
37
+
DELETE
38
+
.set(true)
39
+
.map_err(|_| eyre!("Failed to set input param"))?;
40
+
}
41
+
30
42
if let Some(key) = params.get("key") {
31
-
CRITERIA
32
-
.set(key.clone())
43
+
KEY.set(key.clone())
33
44
.map_err(|err| eyre!("failed setting key: {:#?}", err))
34
45
} else {
35
46
Err(SmartModuleInitError::MissingParam("key".to_string()).into())