Scalable and distributed custom feed generator, ott - on that topic
1use eyre::eyre;
2use fluvio_smartmodule::{RecordData, Result, SmartModuleRecord, smartmodule};
3use serde_json::{Map, Value};
4
5#[smartmodule(filter_map)]
6pub fn filter_map(record: &SmartModuleRecord) -> Result<Option<(Option<RecordData>, RecordData)>> {
7 let key = record.key.clone();
8
9 let string = std::str::from_utf8(record.value.as_ref())?;
10 let mut value: Value = serde_json::from_str(string)?;
11 let obj = value
12 .as_object_mut()
13 .ok_or(eyre!("Failed to parse value"))?;
14
15 if let Ok(uri) = get_uri(obj) {
16 let uri_value = Value::String(uri);
17 obj.insert("uri".to_string(), uri_value);
18
19 Ok(Some((key, value.to_string().as_str().into())))
20 } else {
21 Ok(None)
22 }
23}
24
25fn get_uri(obj: &Map<String, Value>) -> Result<String> {
26 let collection = obj
27 .get("commit")
28 .and_then(|v| v.get("collection"))
29 .and_then(|v| v.as_str())
30 .ok_or(eyre!("Missing commit.collection"))?;
31 match collection {
32 "app.bsky.feed.post" => {
33 let did = obj
34 .get("did")
35 .and_then(|v| v.as_str())
36 .ok_or(eyre!("did missing or not a string"))?;
37
38 let commit = obj.get("commit").ok_or(eyre!("commit missing"))?;
39
40 let collection = commit
41 .get("collection")
42 .and_then(|v| v.as_str())
43 .ok_or(eyre!("commit.collection missing or not a string"))?;
44
45 let rkey = commit
46 .get("rkey")
47 .and_then(|v| v.as_str())
48 .ok_or(eyre!("commit.rkey missing or not a string"))?;
49
50 Ok(format!("at://{did}/{collection}/{rkey}"))
51 },
52 "app.bsky.feed.like" => {
53 let uri = obj
54 .get("commit")
55 .and_then(|v| v.get("record"))
56 .and_then(|v| v.get("subject"))
57 .and_then(|v| v.get("uri"))
58 .and_then(|v| v.as_str())
59 .ok_or(eyre!("Likely not commit message"))?;
60 Ok(uri.to_string())
61 }
62 &_ => {
63 Err(eyre!("Not supported yet"))
64 }
65 }
66}