crossing the streams
3
fork

Configure Feed

Select the types of activity you want to include in your feed.

fix(generator): termination for duplex generators (#82)

authored by ndyg.ca and committed by

GitHub 59f5a355 b6e425ed

+82 -25
+28 -24
src/generators/generator.rs
··· 1 1 use scru128::Scru128Id; 2 2 use tokio::task::JoinHandle; 3 3 4 - use futures::StreamExt; 4 + use nu_protocol::{ByteStream, ByteStreamType, PipelineData, Signals, Span, Value}; 5 + use std::sync::atomic::AtomicBool; 6 + use std::sync::Arc; 5 7 use tokio::io::AsyncReadExt; 6 - use tokio_stream::wrappers::ReceiverStream; 7 - 8 - use nu_protocol::{ByteStream, ByteStreamType, PipelineData, Span, Value}; 9 8 10 9 use crate::nu; 11 10 use crate::nu::ReturnOptions; ··· 96 95 }; 97 96 let opts: GeneratorScriptOptions = nu_config.deserialize_options().unwrap_or_default(); 98 97 98 + // Create and set the interrupt signal on the engine state 99 + let interrupt = Arc::new(AtomicBool::new(false)); 100 + engine.state.set_signals(Signals::new(interrupt.clone())); 101 + 99 102 let task = GeneratorLoop { 100 103 id: spawn_frame.id, 101 104 context_id: spawn_frame.context_id, ··· 186 189 task: &GeneratorLoop, 187 190 rx: tokio::sync::mpsc::Receiver<Frame>, 188 191 ) -> PipelineData { 189 - let base_topic = task.topic.clone(); 190 - let stream = ReceiverStream::new(rx); 191 - let stream = stream 192 - .filter_map(move |frame: Frame| { 193 - let store = store.clone(); 194 - let topic = format!("{}.send", base_topic); 195 - async move { 192 + let topic = format!("{}.send", task.topic); 193 + let signals = task.engine.state.signals().clone(); 194 + let mut rx = rx; 195 + let iter = std::iter::from_fn(move || loop { 196 + if signals.interrupted() { 197 + return None; 198 + } 199 + 200 + match rx.try_recv() { 201 + Ok(frame) => { 196 202 if frame.topic == topic { 197 203 if let Some(hash) = frame.hash { 198 - if let Ok(content) = store.cas_read(&hash).await { 199 - return Some(content); 204 + if let Ok(bytes) = store.cas_read_sync(&hash) { 205 + if let Ok(content) = String::from_utf8(bytes) { 206 + return Some(content); 207 + } 200 208 } 201 209 } 202 210 } 203 - None 204 211 } 205 - }) 206 - .boxed(); 207 - 208 - let handle = tokio::runtime::Handle::current(); 209 - let mut stream = Some(stream); 210 - let iter = std::iter::from_fn(move || { 211 - if let Some(ref mut s) = stream { 212 - handle.block_on(async { s.next().await }) 213 - } else { 214 - None 212 + Err(tokio::sync::mpsc::error::TryRecvError::Empty) => { 213 + std::thread::sleep(std::time::Duration::from_millis(10)); 214 + continue; 215 + } 216 + Err(tokio::sync::mpsc::error::TryRecvError::Disconnected) => { 217 + return None; 218 + } 215 219 } 216 220 }); 217 221
+54 -1
src/generators/tests.rs
··· 1 1 use super::*; 2 + use std::time::Duration; 2 3 use tempfile::TempDir; 3 4 4 5 use crate::nu; ··· 505 506 } 506 507 507 508 #[tokio::test] 509 + async fn test_duplex_terminate_stops() { 510 + let (store, engine, ctx) = setup_test_env(); 511 + 512 + { 513 + let store = store.clone(); 514 + let engine = engine.clone(); 515 + tokio::spawn(async move { 516 + serve(store, engine).await.unwrap(); 517 + }); 518 + } 519 + 520 + let script = r#"{ run: {|| each { |x| $"echo: ($x)" } }, duplex: true }"#; 521 + let hash = store.cas_insert(script).await.unwrap(); 522 + 523 + store 524 + .append(Frame::builder("echo.spawn", ctx.id).hash(hash).build()) 525 + .unwrap(); 526 + 527 + let options = ReadOptions::builder() 528 + .context_id(ctx.id) 529 + .follow(FollowOption::On) 530 + .tail(true) 531 + .build(); 532 + let mut recver = store.read(options).await; 533 + 534 + // expect start 535 + let frame = recver.recv().await.unwrap(); 536 + assert_eq!(frame.topic, "echo.start"); 537 + 538 + // terminate while generator waits for input 539 + store 540 + .append(Frame::builder("echo.terminate", ctx.id).build()) 541 + .unwrap(); 542 + let frame = recver.recv().await.unwrap(); 543 + assert_eq!(frame.topic, "echo.terminate"); 544 + 545 + // expect stop frame with reason terminate 546 + let frame = recver.recv().await.unwrap(); 547 + assert_eq!(frame.topic, "echo.stop"); 548 + assert_eq!(frame.meta.unwrap()["reason"], "terminate"); 549 + 550 + store 551 + .append(Frame::builder("echo.send", ctx.id).build()) 552 + .unwrap(); 553 + let frame = recver.recv().await.unwrap(); 554 + assert_eq!(frame.topic, "echo.send"); 555 + 556 + // ensure no additional frames after stop 557 + assert_no_more_frames(&mut recver).await; 558 + } 559 + 560 + #[tokio::test] 508 561 async fn test_spawn_error_eviction() { 509 562 let (store, engine, ctx) = setup_test_env(); 510 563 ··· 545 598 assert_eq!(stop_frame.meta.as_ref().unwrap()["reason"], "spawn.error"); 546 599 547 600 // Allow ServeLoop to process the spawn.error and evict the generator 548 - tokio::time::sleep(std::time::Duration::from_millis(50)).await; 601 + tokio::time::sleep(Duration::from_millis(50)).await; 549 602 550 603 let good_script = r#"{ run: {|| "ok" } }"#; 551 604 store