Scalable and distributed custom feed generator, ott - on that topic
1use eyre::eyre; 2use fluvio_smartmodule::{RecordData, Result, SmartModuleRecord, smartmodule}; 3use serde_json::{Map, Value}; 4 5#[smartmodule(filter_map)] 6pub fn filter_map(record: &SmartModuleRecord) -> Result<Option<(Option<RecordData>, RecordData)>> { 7 let key = record.key.clone(); 8 9 let string = std::str::from_utf8(record.value.as_ref())?; 10 let mut value: Value = serde_json::from_str(string)?; 11 let obj = value 12 .as_object_mut() 13 .ok_or(eyre!("Failed to parse value"))?; 14 15 if let Ok(uri) = get_uri(obj) { 16 let uri_value = Value::String(uri); 17 obj.insert("uri".to_string(), uri_value); 18 19 Ok(Some((key, value.to_string().as_str().into()))) 20 } else { 21 Ok(None) 22 } 23} 24 25fn get_uri(obj: &Map<String, Value>) -> Result<String> { 26 let collection = obj 27 .get("commit") 28 .and_then(|v| v.get("collection")) 29 .and_then(|v| v.as_str()) 30 .ok_or(eyre!("Missing commit.collection"))?; 31 match collection { 32 "app.bsky.feed.post" => { 33 let did = obj 34 .get("did") 35 .and_then(|v| v.as_str()) 36 .ok_or(eyre!("did missing or not a string"))?; 37 38 let commit = obj.get("commit").ok_or(eyre!("commit missing"))?; 39 40 let collection = commit 41 .get("collection") 42 .and_then(|v| v.as_str()) 43 .ok_or(eyre!("commit.collection missing or not a string"))?; 44 45 let rkey = commit 46 .get("rkey") 47 .and_then(|v| v.as_str()) 48 .ok_or(eyre!("commit.rkey missing or not a string"))?; 49 50 Ok(format!("at://{did}/{collection}/{rkey}")) 51 }, 52 "app.bsky.feed.like" => { 53 let uri = obj 54 .get("commit") 55 .and_then(|v| v.get("record")) 56 .and_then(|v| v.get("subject")) 57 .and_then(|v| v.get("uri")) 58 .and_then(|v| v.as_str()) 59 .ok_or(eyre!("Likely not commit message"))?; 60 Ok(uri.to_string()) 61 } 62 &_ => { 63 Err(eyre!("Not supported yet")) 64 } 65 } 66}