Scalable and distributed custom feed generator, ott - on that topic

Adjust ingestor

Changed files
+33 -42
crates
-1
crates/ott-ingest/Cargo.toml
··· 14 14 tokio-stream = "0.1.17" 15 15 tracing = "0.1.41" 16 16 tracing-subscriber = { version = "0.3.20", features = ["env-filter"] } 17 - typify = "0.4.3" 18 17 19 18 [build-dependencies] 20 19 anyhow = "1.0.100"
-3
crates/ott-ingest/src/at_types.rs
··· 1 - use typify::import_types; 2 - 3 - import_types!(schema = "schemas/like.json");
-1
crates/ott-ingest/src/lib.rs
··· 1 - pub mod at_types; 2 1 pub mod tei_client;
+33 -37
crates/ott-ingest/src/main.rs
··· 1 + use serde::Deserialize; 1 2 use tokio::{ 2 3 select, 3 4 time::{Duration, sleep}, 4 5 }; 5 6 use tokio_stream::StreamExt; 6 7 use tracing::{debug, error, info, warn}; 7 - use tracing_subscriber::{EnvFilter, fmt}; 8 + use tracing_subscriber::{EnvFilter, field::display, fmt}; 8 9 9 10 use fluvio::{ 10 11 Fluvio, Offset, ··· 14 15 ops::compute::{CompResult, Op}, 15 16 sync::Cache, 16 17 }; 17 - use ott_ingest::at_types; 18 18 use ott_ingest::tei_client; 19 19 20 20 const LIKES_TOPIC: &str = "raw-likes"; 21 21 const POSTS_TOPIC: &str = "raw-posts"; 22 22 const PARTITION_NUM: u32 = 0; 23 23 24 - #[derive(Clone)] 25 - struct BskyPost { 26 - likes: u32, 24 + #[derive(Debug, Deserialize, Clone)] 25 + struct Post { 26 + did: String, 27 27 uri: String, 28 + #[serde(default)] 29 + count: u32, 30 + } 31 + 32 + #[derive(Debug, Deserialize, Clone)] 33 + struct Like { 28 34 did: String, 29 - text: String, 35 + #[serde(rename = "commit.record.subject.uri")] 36 + uri: String, 30 37 } 31 - 32 - impl BskyPost {} 33 38 34 39 #[tokio::main] 35 40 async fn main() { ··· 38 43 .with_env_filter(EnvFilter::from_default_env()) 39 44 .init(); 40 45 41 - let posts_cache: Cache<String, u32> = Cache::builder() 46 + let posts_cache: Cache<String, Post> = Cache::builder() 42 47 .time_to_live(Duration::from_secs(60 * 60)) 43 48 .build(); 44 49 ··· 72 77 let pcc = posts_cache.clone(); 73 78 let lcc = posts_cache.clone(); 74 79 select! { 75 - result = like_stream.next() => { 76 - match result { 77 - Some(Ok(record)) => { 78 - record.value(); 79 - pcc.entry(String::from("hej")) 80 - .and_compute_with(|maybe_entry| { 81 - if let Some(entry) = maybe_entry { 82 - let counter = entry.into_value(); 83 - if counter < 2 { 84 - Op::Put(counter.saturating_add(1)) // Update 85 - } else { 86 - Op::Remove 87 - } 88 - } else { 89 - Op::Put(1) // Insert 90 - } 91 - }); 92 - } 93 - Some(Err(e)) => {todo!()} 94 - None => {todo!()} 95 - } 96 - } 97 - 98 - Some(Ok(result)) = posts_stream.next() => { 99 - lcc.entry(String::from("hej")) 80 + Some(Ok(record)) = posts_stream.next() => { 81 + let post: Post = serde_json::from_slice(record.value()).unwrap(); 82 + pcc.entry(post.uri.clone()) 83 + .and_compute_with(|maybe_entry| { 84 + if maybe_entry.is_some() { 85 + Op::Nop 86 + } else { 87 + Op::Put(post) // Insert 88 + } 89 + }); 90 + }, 91 + Some(Ok(record)) = like_stream.next() => { 92 + let like: Like = serde_json::from_slice(record.value()).unwrap(); 93 + lcc.entry(like.uri) 100 94 .and_compute_with(|maybe_entry| { 101 95 if let Some(entry) = maybe_entry { 102 - let counter = entry.into_value(); 103 - if counter < 2 { 104 - Op::Put(counter.saturating_add(1)) // Update 96 + let mut post = entry.into_value(); 97 + if post.count < 2 { 98 + post.count +=1; 99 + warn!("Incread counter for {:#?}", post); 100 + Op::Put(post) 105 101 } else { 106 102 Op::Remove 107 103 }