WIP push-to-talk Letta chat frontend
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

display transcription in frontend

graham.systems 9eaa88c6 5678f3d8

verified
+199 -144
+13 -3
src-tauri/src/cartesia/commands.rs
··· 1 - use crate::state::AppState; 2 3 #[tauri::command] 4 - pub async fn start_stt(state: tauri::State<'_, AppState>) -> Result<(), ()> { 5 - state.stt_manager.transcribe().await; 6 Ok(()) 7 } 8
··· 1 + use tauri::ipc::Channel; 2 + 3 + use crate::{cartesia::stt::TranscriptionWord, state::AppState}; 4 5 #[tauri::command] 6 + pub async fn start_stt( 7 + state: tauri::State<'_, AppState>, 8 + on_event: Channel<TranscriptionWord>, 9 + ) -> Result<(), ()> { 10 + let mut rec = state.stt_manager.transcribe().await; 11 + 12 + while let Some(word) = rec.recv().await { 13 + on_event.send(word).expect("failed to send word to client"); 14 + } 15 + 16 Ok(()) 17 } 18
+147 -134
src-tauri/src/cartesia/stt.rs
··· 2 use crate::devices::input::InputDeviceManager; 3 use dasp::Signal; 4 use futures_util::{ 5 - future::join, 6 stream::{SplitSink, SplitStream}, 7 SinkExt, StreamExt, 8 }; 9 - use serde::Deserialize; 10 use std::sync::Arc; 11 - use tauri::async_runtime::{Mutex, RwLock}; 12 use tokio::net::TcpStream; 13 use tokio_tungstenite::{MaybeTlsStream, WebSocketStream}; 14 use tungstenite::{Bytes, Error, Message, Utf8Bytes}; ··· 29 input: Arc<InputDeviceManager>, 30 } 31 32 - #[derive(Deserialize, Debug)] 33 - struct TranscriptionWord { 34 word: String, 35 start: f32, 36 end: f32, 37 } 38 39 #[derive(Deserialize, Debug)] 40 - #[serde(untagged)] 41 - enum TranscriptionMessage { 42 Transcript { 43 - #[serde(rename = "type")] 44 - message_type: String, 45 request_id: String, 46 is_final: bool, 47 text: String, 48 duration: Option<f32>, 49 language: Option<String>, 50 - words: Vec<TranscriptionWord>, 51 }, 52 Error { 53 - #[serde(rename = "type")] 54 - message_type: String, 55 message: String, 56 request_id: Option<String>, 57 }, 58 Done { 59 - #[serde(rename = "type")] 60 - message_type: String, 61 request_id: String, 62 }, 63 } ··· 71 } 72 } 73 74 - async fn handle_messages( 75 - &self, 76 - mut reader: SplitStream<WebSocketStream<MaybeTlsStream<TcpStream>>>, 77 - ) { 78 - println!("handling messages"); 79 - 80 - while let Some(message) = reader.next().await { 81 - match message { 82 - Ok(Message::Text(msg)) => { 83 - println!("got message: {}", msg); 84 - 85 - match serde_json::from_str::<TranscriptionMessage>(msg.as_str()) { 86 - Ok(TranscriptionMessage::Transcript { 87 - message_type, 88 - request_id, 89 - is_final, 90 - text, 91 - duration, 92 - language, 93 - words, 94 - }) => { 95 - println!("transcribed: {:?}", text); 96 - } 97 - Ok(TranscriptionMessage::Done { 98 - message_type, 99 - request_id, 100 - }) => { 101 - println!("recieved done message, stopping"); 102 - self.stop_transcription().await; 103 - break; 104 - } 105 - Ok(msg) => { 106 - println!("received a message: {:?}", msg) 107 - } 108 - Err(err) => { 109 - eprintln!("failed to parse message: {}", err); 110 - self.stop_transcription().await; 111 - break; 112 - } 113 - } 114 - } 115 - Ok(msg) => { 116 - println!("got non-text message: {}", msg); 117 - } 118 - Err(Error::ConnectionClosed) | Err(Error::AlreadyClosed) => { 119 - eprintln!("connection closed"); 120 - break; 121 - } 122 - Err(err) => { 123 - eprintln!("err with connection: {}", err); 124 - break; 125 - } 126 - } 127 - } 128 - } 129 - 130 - async fn send_frames( 131 - &self, 132 - signal: impl Signal<Frame = i16>, 133 - writer: Arc<Mutex<SplitSink<WebSocketStream<MaybeTlsStream<TcpStream>>, Message>>>, 134 - ) { 135 - let mut w = writer.lock().await; 136 - let mut buffer = Vec::with_capacity(3200); 137 - 138 - for frame in signal.until_exhausted() { 139 - let status = self.status.read().await; 140 - 141 - match *status { 142 - SttStatus::Connected => { 143 - buffer.push(frame.to_le_bytes()); 144 - 145 - if buffer.len() == buffer.capacity() { 146 - w.send(tungstenite::Message::Binary(Bytes::from_iter( 147 - buffer.iter().flat_map(|f| *f), 148 - ))) 149 - .await 150 - .expect("failed to send binary frame message to STT"); 151 - 152 - buffer.clear(); 153 - } 154 - } 155 - SttStatus::Flushing => { 156 - println!("flushing audio data"); 157 - 158 - w.send(tungstenite::Message::Binary(Bytes::from_iter( 159 - buffer.iter().flat_map(|f| *f), 160 - ))) 161 - .await 162 - .expect("failed to send binary frame message to STT"); 163 - 164 - w.send(Message::Text(Utf8Bytes::from_static("done"))) 165 - .await 166 - .expect("failed to send done message"); 167 - 168 - let _ = w.flush().await; 169 - break; 170 - } 171 - _ => (), 172 - } 173 - } 174 - } 175 - 176 /// Begins transcribing text via Cartesia. Blocks until `stop_transcription` is called 177 - pub async fn transcribe(&self) { 178 - println!("starting transcription"); 179 - 180 { 181 let mut status = self.status.write().await; 182 *status = SttStatus::Opening; ··· 184 185 let input = self.input.start_listening().await; 186 let stream = self.client.open_stt_connection().await; 187 - let (tx, rx) = stream.split(); 188 - let writer = Arc::new(Mutex::new(tx)); 189 190 { 191 let mut status = self.status.write().await; 192 *status = SttStatus::Connected; 193 } 194 195 - // Handle incoming messages 196 - let read = self.handle_messages(rx); 197 - let write = self.send_frames(input, writer.clone()); 198 199 - join(read, write).await; 200 - 201 - writer 202 - .lock() 203 - .await 204 - .send(Message::Close(None)) 205 - .await 206 - .expect("failed to close socket"); 207 } 208 209 async fn flush_audio(&self) { ··· 213 214 /// Terminates the microphone signal and halts the transcription processes 215 pub async fn stop_transcription(&self) { 216 - println!("stopping transcription"); 217 - 218 self.flush_audio().await; 219 self.input.stop_listening().await; 220 } 221 }
··· 2 use crate::devices::input::InputDeviceManager; 3 use dasp::Signal; 4 use futures_util::{ 5 stream::{SplitSink, SplitStream}, 6 SinkExt, StreamExt, 7 }; 8 + use serde::{Deserialize, Serialize}; 9 use std::sync::Arc; 10 + use tauri::async_runtime::{channel, spawn, Mutex, Receiver, RwLock, Sender}; 11 use tokio::net::TcpStream; 12 use tokio_tungstenite::{MaybeTlsStream, WebSocketStream}; 13 use tungstenite::{Bytes, Error, Message, Utf8Bytes}; ··· 28 input: Arc<InputDeviceManager>, 29 } 30 31 + #[derive(Deserialize, Debug, Serialize)] 32 + pub struct TranscriptionWord { 33 word: String, 34 start: f32, 35 end: f32, 36 } 37 38 #[derive(Deserialize, Debug)] 39 + pub enum MessageType { 40 + #[serde(rename = "transcript")] 41 + Transcript, 42 + #[serde(rename = "error")] 43 + Error, 44 + #[serde(rename = "done")] 45 + Done, 46 + #[serde(rename = "flush_done")] 47 + FlushDone, 48 + } 49 + 50 + #[derive(Deserialize, Debug)] 51 + #[serde(tag = "type", rename_all = "snake_case")] 52 + pub enum TranscriptionMessage { 53 Transcript { 54 request_id: String, 55 is_final: bool, 56 text: String, 57 duration: Option<f32>, 58 language: Option<String>, 59 + words: Option<Vec<TranscriptionWord>>, 60 }, 61 Error { 62 message: String, 63 request_id: Option<String>, 64 }, 65 Done { 66 + request_id: String, 67 + }, 68 + FlushDone { 69 request_id: String, 70 }, 71 } ··· 79 } 80 } 81 82 /// Begins transcribing text via Cartesia. Blocks until `stop_transcription` is called 83 + pub async fn transcribe(&self) -> Receiver<TranscriptionWord> { 84 { 85 let mut status = self.status.write().await; 86 *status = SttStatus::Opening; ··· 88 89 let input = self.input.start_listening().await; 90 let stream = self.client.open_stt_connection().await; 91 + let (socket_in, socket_out) = stream.split(); 92 + let (fn_in, fn_out) = channel::<TranscriptionWord>(500); 93 + let writer = Arc::new(Mutex::new(socket_in)); 94 95 { 96 let mut status = self.status.write().await; 97 *status = SttStatus::Connected; 98 } 99 100 + spawn(handle_messages( 101 + self.status.clone(), 102 + self.input.clone(), 103 + socket_out, 104 + writer.clone(), 105 + fn_in, 106 + )); 107 + spawn(send_frames(self.status.clone(), input, writer.clone())); 108 109 + return fn_out; 110 } 111 112 async fn flush_audio(&self) { ··· 116 117 /// Terminates the microphone signal and halts the transcription processes 118 pub async fn stop_transcription(&self) { 119 self.flush_audio().await; 120 self.input.stop_listening().await; 121 } 122 } 123 + 124 + async fn handle_messages( 125 + status: Arc<RwLock<SttStatus>>, 126 + input: Arc<InputDeviceManager>, 127 + mut socket_reader: SplitStream<WebSocketStream<MaybeTlsStream<TcpStream>>>, 128 + socket_writer: Arc<Mutex<SplitSink<WebSocketStream<MaybeTlsStream<TcpStream>>, Message>>>, 129 + out_writer: Sender<TranscriptionWord>, 130 + ) { 131 + while let Some(message) = socket_reader.next().await { 132 + match message { 133 + Ok(Message::Text(msg)) => { 134 + match serde_json::from_str::<TranscriptionMessage>(msg.as_str()) { 135 + Ok(TranscriptionMessage::Transcript { 136 + request_id: _, 137 + is_final: _, 138 + text, 139 + duration: _, 140 + language: _, 141 + words, 142 + }) => match words { 143 + Some(ws) => { 144 + for w in ws { 145 + out_writer.send(w).await.expect("failed to queue word") 146 + } 147 + } 148 + None => (), 149 + }, 150 + Ok(TranscriptionMessage::Done { request_id: _ }) => { 151 + input.stop_listening().await; 152 + socket_writer 153 + .lock() 154 + .await 155 + .send(Message::Close(None)) 156 + .await 157 + .expect("failed to close socket"); 158 + 159 + break; 160 + } 161 + Ok(msg) => { 162 + println!("received a message: {:?}", msg) 163 + } 164 + Err(err) => { 165 + eprintln!("failed to parse message: {}", err); 166 + 167 + { 168 + let mut status = status.write().await; 169 + *status = SttStatus::Flushing; 170 + } 171 + 172 + input.stop_listening().await; 173 + break; 174 + } 175 + } 176 + } 177 + Ok(msg) => { 178 + println!("got non-text message: {}", msg); 179 + } 180 + Err(Error::ConnectionClosed) | Err(Error::AlreadyClosed) => { 181 + eprintln!("connection closed"); 182 + break; 183 + } 184 + Err(err) => { 185 + eprintln!("err with connection: {}", err); 186 + break; 187 + } 188 + } 189 + } 190 + } 191 + 192 + async fn send_frames( 193 + status: Arc<RwLock<SttStatus>>, 194 + signal: impl Signal<Frame = i16>, 195 + writer: Arc<Mutex<SplitSink<WebSocketStream<MaybeTlsStream<TcpStream>>, Message>>>, 196 + ) { 197 + let mut w = writer.lock().await; 198 + let mut buffer = Vec::with_capacity(3200); 199 + 200 + for frame in signal.until_exhausted() { 201 + let status = status.read().await; 202 + 203 + match *status { 204 + SttStatus::Connected => { 205 + buffer.push(frame.to_le_bytes()); 206 + 207 + if buffer.len() == buffer.capacity() { 208 + w.send(tungstenite::Message::Binary(Bytes::from_iter( 209 + buffer.iter().flat_map(|f| *f), 210 + ))) 211 + .await 212 + .expect("failed to send binary frame message to STT"); 213 + 214 + buffer.clear(); 215 + } 216 + } 217 + SttStatus::Flushing => { 218 + w.send(tungstenite::Message::Binary(Bytes::from_iter( 219 + buffer.iter().flat_map(|f| *f), 220 + ))) 221 + .await 222 + .expect("failed to send binary frame message to STT"); 223 + 224 + w.send(Message::Text(Utf8Bytes::from_static("done"))) 225 + .await 226 + .expect("failed to send done message"); 227 + 228 + let _ = w.flush().await; 229 + break; 230 + } 231 + _ => (), 232 + } 233 + } 234 + }
-2
src-tauri/src/devices/input.rs
··· 38 } 39 40 pub async fn start_listening(&self) -> impl Signal<Frame = i16> { 41 - println!("opening device stream"); 42 let config = self 43 .device 44 .supported_input_configs() ··· 76 } 77 78 pub async fn stop_listening(&self) { 79 - println!("dropping device stream"); 80 let mut s = self.stream.lock().await; 81 *s = None; 82 }
··· 38 } 39 40 pub async fn start_listening(&self) -> impl Signal<Frame = i16> { 41 let config = self 42 .device 43 .supported_input_configs() ··· 75 } 76 77 pub async fn stop_listening(&self) { 78 let mut s = self.stream.lock().await; 79 *s = None; 80 }
+39 -5
src/routes/+page.svelte
··· 1 <script lang="ts"> 2 - import { invoke } from "@tauri-apps/api/core"; 3 4 let keyInvocation = $state( 5 invoke("has_secret", { name: "cartesia_api_key" }), 6 ); 7 - let isRecording = $state(false); 8 9 - $effect(() => { 10 - if (isRecording) invoke("start_stt"); 11 - else invoke("stop_stt"); 12 }); 13 </script> 14 ··· 68 <p>{err}</p> 69 {/await} 70 </div>
··· 1 <script lang="ts"> 2 + import { invoke, Channel } from "@tauri-apps/api/core"; 3 + 4 + interface TranscriptionWord { 5 + word: string; 6 + start: number; 7 + end: number; 8 + } 9 10 + let isRecording = $state(false); 11 + let draft = $state(""); 12 + let history = $state(new Array<string>()) 13 let keyInvocation = $state( 14 invoke("has_secret", { name: "cartesia_api_key" }), 15 ); 16 17 + $effect(async () => { 18 + if (isRecording) { 19 + const onEvent = new Channel<TranscriptionWord>() 20 + 21 + onEvent.onmessage = word => { 22 + draft += word.word 23 + } 24 + 25 + invoke("start_stt", { onEvent }); 26 + } else { 27 + await invoke("stop_stt"); 28 + 29 + if (draft) { 30 + history.unshift(draft) 31 + draft = "" 32 + } 33 + } 34 }); 35 </script> 36 ··· 90 <p>{err}</p> 91 {/await} 92 </div> 93 + 94 + {#if draft} 95 + <div class="bg-rose-pine-surface bg-rose-pine-subtle italic rounded-md p-3 flex gap-3 shadow-xl"> 96 + {draft} 97 + </div> 98 + {/if} 99 + 100 + {#each history as text} 101 + <div class="bg-rose-pine-base rounded-md p-3 flex gap-3 shadow-xl"> 102 + {text} 103 + </div> 104 + {/each}