Scalable and distributed custom feed generator, ott - on that topic

Key like records as well

Changed files
+62 -32
connectors
crates
ott-ingest
src
smart-modules
construct-post-uri
src
+5 -4
README.md
··· 9 9 ## Install dependencies 10 10 11 11 ```shell 12 - brew install kind helm skaffold fluvio 13 - ``` 12 + # Install k8s tooling 13 + brew install kind helm skaffold 14 14 15 - ## Rust 15 + # Install fvm and fluvio cli 16 + curl -fsS https://hub.infinyon.cloud/install/install.sh | bash 16 17 17 - ```shell 18 + # Install rust 18 19 curl --proto '=https' --tlsv1.2 -sSf https://sh.rustup.rs | sh 19 20 ``` 20 21
+5
connectors/likes-config.yaml
··· 8 8 batch-size: "4 MB" 9 9 http: 10 10 endpoint: "wss://jetstream2.us-east.bsky.network/subscribe?wantedCollections=app.bsky.feed.like" 11 + transforms: 12 + - uses: aleeve/construct-post-uri@0.1.0 13 + - uses: aleeve/assign-record-key@0.1.0 14 + with: 15 + key: uri
+17 -15
crates/ott-ingest/src/main.rs
··· 32 32 #[derive(Debug, Deserialize, Clone)] 33 33 struct Like { 34 34 did: String, 35 - #[serde(rename = "commit.record.subject.uri")] 36 35 uri: String, 37 36 } 38 37 ··· 89 88 }); 90 89 }, 91 90 Some(Ok(record)) = like_stream.next() => { 92 - let like: Like = serde_json::from_slice(record.value()).unwrap(); 93 - lcc.entry(like.uri) 94 - .and_compute_with(|maybe_entry| { 95 - if let Some(entry) = maybe_entry { 96 - let mut post = entry.into_value(); 97 - if post.count < 2 { 98 - post.count +=1; 99 - warn!("Incread counter for {:#?}", post); 100 - Op::Put(post) 91 + if let Ok(like) = serde_json::from_slice::<Like>(record.value()) { 92 + lcc.entry(like.uri) 93 + .and_compute_with(|maybe_entry| { 94 + if let Some(entry) = maybe_entry { 95 + let mut post = entry.into_value(); 96 + if post.count < 20 { 97 + post.count +=1; 98 + warn!("Incread counter for {:#?}", post); 99 + Op::Put(post) 100 + } else { 101 + Op::Remove 102 + } 101 103 } else { 102 - Op::Remove 104 + Op::Nop // Skip as post is out of cache 103 105 } 104 - } else { 105 - Op::Nop // Skip as post is out of cache 106 - } 107 - }); 106 + }); 107 + } else { 108 + warn!("Failed deserializing, likely not like commit"); 109 + }; 108 110 } 109 111 } 110 112 }
+35 -13
smart-modules/construct-post-uri/src/lib.rs
··· 23 23 } 24 24 25 25 fn get_uri(obj: &Map<String, Value>) -> Result<String> { 26 - let did = obj 27 - .get("did") 26 + let collection = obj 27 + .get("commit") 28 + .and_then(|v| v.get("collection")) 28 29 .and_then(|v| v.as_str()) 29 - .ok_or(eyre!("did missing or not a string"))?; 30 + .ok_or(eyre!("Missing commit.collection"))?; 31 + match collection { 32 + "app.bsky.feed.post" => { 33 + let did = obj 34 + .get("did") 35 + .and_then(|v| v.as_str()) 36 + .ok_or(eyre!("did missing or not a string"))?; 30 37 31 - let commit = obj.get("commit").ok_or(eyre!("commit missing"))?; 38 + let commit = obj.get("commit").ok_or(eyre!("commit missing"))?; 32 39 33 - let collection = commit 34 - .get("collection") 35 - .and_then(|v| v.as_str()) 36 - .ok_or(eyre!("commit.collection missing or not a string"))?; 40 + let collection = commit 41 + .get("collection") 42 + .and_then(|v| v.as_str()) 43 + .ok_or(eyre!("commit.collection missing or not a string"))?; 37 44 38 - let rkey = commit 39 - .get("rkey") 40 - .and_then(|v| v.as_str()) 41 - .ok_or(eyre!("commit.rkey missing or not a string"))?; 45 + let rkey = commit 46 + .get("rkey") 47 + .and_then(|v| v.as_str()) 48 + .ok_or(eyre!("commit.rkey missing or not a string"))?; 42 49 43 - Ok(format!("at://{did}/{collection}/{rkey}")) 50 + Ok(format!("at://{did}/{collection}/{rkey}")) 51 + }, 52 + "app.bsky.feed.like" => { 53 + let uri = obj 54 + .get("commit") 55 + .and_then(|v| v.get("record")) 56 + .and_then(|v| v.get("subject")) 57 + .and_then(|v| v.get("uri")) 58 + .and_then(|v| v.as_str()) 59 + .ok_or(eyre!("Likely not commit message"))?; 60 + Ok(uri.to_string()) 61 + } 62 + &_ => { 63 + Err(eyre!("Not supported yet")) 64 + } 65 + } 44 66 }