+1
crates/Cargo.lock
+1
crates/Cargo.lock
+1
crates/ott-embed/Cargo.toml
+1
crates/ott-embed/Cargo.toml
+82
-2
crates/ott-embed/src/main.rs
+82
-2
crates/ott-embed/src/main.rs
···
1
+
use ott_embed::tei_client::TextEmbedding;
2
+
use serde::{Deserialize, Serialize};
3
+
use tokio::sync::mpsc::{self, Receiver, Sender};
4
+
5
+
use tokio_stream::StreamExt;
6
+
use tracing::{error, info, warn};
7
+
use tracing_subscriber::EnvFilter;
8
+
9
+
use fluvio::{consumer::ConsumerConfigExtBuilder, Fluvio, Offset};
10
+
use ott_types::{Embedding, Post};
11
+
1
12
const TEI_URL: &str = "http://localhost:8080";
13
+
const TOPIC: &str = "posts";
14
+
const PARTITION: u32 = 0;
2
15
3
-
fn main() {
4
-
println!("Hello, world!");
16
+
#[tokio::main]
17
+
async fn main() {
18
+
tracing_subscriber::fmt()
19
+
.with_ansi(true) // Colors enabled (default)
20
+
.with_env_filter(EnvFilter::from_default_env())
21
+
.init();
22
+
23
+
let (embed_tx, embed_rx) = tokio::sync::mpsc::channel::<Post>(1000);
24
+
let (store_tx, store_rx) = tokio::sync::mpsc::channel::<Embedding>(1000);
25
+
26
+
let read_task = tokio::spawn(async { read_task(embed_tx).await });
27
+
let embed_task = tokio::spawn(async { embed_task(embed_rx, store_tx).await });
28
+
let store_task = tokio::spawn(async { store_task(store_rx).await });
29
+
30
+
let _result = tokio::join!(read_task, embed_task, store_task);
31
+
}
32
+
33
+
async fn read_task(sink: Sender<Post>) {
34
+
let fluvio = Fluvio::connect()
35
+
.await
36
+
.expect("Failed to connect to Fluvio");
37
+
38
+
let config = ConsumerConfigExtBuilder::default()
39
+
.topic(TOPIC)
40
+
.partition(PARTITION)
41
+
.offset_start(Offset::beginning())
42
+
.build()
43
+
.expect("Failed to build consumer config");
44
+
let mut stream = fluvio
45
+
.consumer_with_config(config)
46
+
.await
47
+
.expect("Failed to create consumer");
48
+
49
+
warn!("Ready to start consuming posts");
50
+
while let Some(message) = stream.next().await
51
+
&& let Ok(record) = message
52
+
{
53
+
let post: Post = serde_json::from_slice(record.value()).expect("Invalid post message");
54
+
sink.send(post).await.expect("Failed to internally send post");
55
+
}
56
+
}
57
+
58
+
async fn embed_task(mut posts: Receiver<Post>, sink: Sender<Embedding>) {
59
+
let tei_client = TextEmbedding::new(TEI_URL);
60
+
61
+
warn!("Ready to start embedding posts");
62
+
while let Some(post) = posts.recv().await {
63
+
let embedding = tei_client.embed(&post.text).await;
64
+
match embedding {
65
+
Ok(vec) => {
66
+
sink.send(Embedding {
67
+
uri: post.uri,
68
+
vector: vec,
69
+
})
70
+
.await
71
+
.expect("Failed to send embedding between tasks");
72
+
}
73
+
Err(e) => {
74
+
error!(e);
75
+
}
76
+
};
77
+
}
78
+
}
79
+
80
+
async fn store_task(mut embeddings: Receiver<Embedding>) {
81
+
warn!("Ready to start storing embeddings");
82
+
while let Some(embedding) = embeddings.recv().await {
83
+
warn!("Embedded {}", embedding.uri)
84
+
}
5
85
}
+2
-2
crates/ott-filter/src/main.rs
+2
-2
crates/ott-filter/src/main.rs
···
134
134
.offset_start(Offset::beginning())
135
135
.build()
136
136
.expect("Failed to build consumer config");
137
-
let posts_stream = fluvio
137
+
let stream = fluvio
138
138
.consumer_with_config(config)
139
139
.await
140
140
.expect("Failed to create consumer");
141
-
posts_stream
141
+
stream
142
142
}
143
143
144
144
struct PostEmbedding {