crossing the streams
3
fork

Configure Feed

Select the types of activity you want to include in your feed.

feat(generator): add shutdown lifecycle event (#97)

## Summary
- emit shutdown event when the generator loop fully exits
- have ServeLoop evict generators based on `.shutdown`
- update generator docs with shutdown lifecycle event
- expect shutdown frames in tests

## Testing
- `./scripts/check.sh`
- `cd docs && npm run build`

authored by ndyg.ca and committed by

GitHub d158f4cb 4dfddda2

+21 -16
+2 -1
docs/src/content/docs/reference/generators.mdx
··· 41 41 | `<topic>.recv` | Output value from the generator | 42 42 | `<topic>.stop` | Generator pipeline has stopped. The \`meta.reason\` field is a string enum with values `finished`, `error`, `terminate` and `update`. When `finished` or `error`, the pipeline will be restarted automatically; `terminate` means it was stopped manually and the generator loop for this topic/context will shut down. `update` indicates the generator reloaded due to a new `.spawn` frame. | 43 43 | `<topic>.parse.error` | Script failed to parse | 44 + | `<topic>.shutdown` | Generator loop has fully exited; ServeLoop evicts it | 44 45 45 - All events include `source_id` which is the ID of the generator instance. When a `.stop` frame has `meta.reason` set to `update`, it also includes `update_id` referencing the spawn that triggered the reload. 46 + All events include `source_id` which is the ID of the generator instance. When a `.stop` frame has `meta.reason` set to `update`, it also includes `update_id` referencing the spawn that triggered the reload. ServeLoop evicts a generator when it receives a `<topic>.shutdown` frame. 46 47 47 48 ## Configuration Options 48 49
+9 -2
src/generators/generator.rs
··· 120 120 )?; 121 121 } 122 122 123 - GeneratorEventKind::Shutdown => { /* no frame */ } 123 + GeneratorEventKind::Shutdown => { 124 + store.append( 125 + Frame::builder(format!("{}.shutdown", loop_ctx.topic), loop_ctx.context_id) 126 + .meta(json!({ "source_id": task.id.to_string() })) 127 + .build(), 128 + )?; 129 + } 124 130 } 125 131 126 132 Ok(GeneratorEvent { ··· 333 339 &task, 334 340 GeneratorEventKind::Stop(reason.clone()), 335 341 ); 336 - if matches!(reason, StopReason::Terminate) { 342 + if matches!(reason, StopReason::Terminate) || matches!(reason, StopReason::Error { .. }) { 343 + let _ = emit_event(&store, &loop_ctx, &task, GeneratorEventKind::Shutdown); 337 344 break; 338 345 } else if let StopReason::Update { .. } = reason { 339 346 if let Some(nt) = next_task.take() {
+2 -12
src/generators/serve.rs
··· 101 101 continue; 102 102 } 103 103 104 - if let Some(prefix) = frame.topic.strip_suffix(".stop") { 105 - if let Some(reason) = frame 106 - .meta 107 - .as_ref() 108 - .and_then(|m| m.get("reason")) 109 - .and_then(|v| v.as_str()) 110 - { 111 - if reason == "terminate" || reason == "error" { 112 - let key = (prefix.to_string(), frame.context_id); 113 - active.remove(&key); 114 - } 115 - } 104 + if let Some(prefix) = frame.topic.strip_suffix(".shutdown") { 105 + active.remove(&(prefix.to_string(), frame.context_id)); 116 106 continue; 117 107 } 118 108 }
+8 -1
src/generators/tests.rs
··· 2 2 use crate::generators::generator::emit_event; 3 3 use crate::nu::ReturnOptions; 4 4 use nu_protocol; 5 - use scru128::{self, Scru128Id}; 5 + use scru128; 6 6 use std::time::Duration; 7 7 use tempfile::TempDir; 8 8 ··· 449 449 let stop = recver.recv().await.unwrap(); 450 450 assert_eq!(stop.topic, "sleeper.stop"); 451 451 assert_eq!(stop.meta.unwrap()["reason"], "terminate"); 452 + assert_eq!(recver.recv().await.unwrap().topic, "sleeper.shutdown"); 452 453 453 454 store 454 455 .append(Frame::builder("sleeper.spawn", ctx.id).hash(hash).build()) ··· 551 552 let frame = recver.recv().await.unwrap(); 552 553 assert_eq!(frame.topic, "echo.stop"); 553 554 assert_eq!(frame.meta.unwrap()["reason"], "terminate"); 555 + assert_eq!(recver.recv().await.unwrap().topic, "echo.shutdown"); 554 556 555 557 store 556 558 .append(Frame::builder("echo.send", ctx.id).build()) ··· 734 736 let stop1 = recver.recv().await.unwrap(); 735 737 assert_eq!(stop1.topic, "gen1.stop"); 736 738 assert_eq!(stop1.meta.unwrap()["reason"], "terminate"); 739 + let shutdown1 = recver.recv().await.unwrap(); 740 + assert_eq!(shutdown1.topic, "gen1.shutdown"); 737 741 738 742 let msg_hash = store.cas_insert("ping").await.unwrap(); 739 743 store ··· 868 872 ) 869 873 .unwrap(); 870 874 assert!(matches!(ev.kind, GeneratorEventKind::Stop(_))); 875 + 876 + let _ = emit_event(&store, &loop_ctx, &task, GeneratorEventKind::Shutdown).unwrap(); 877 + assert_eq!(store.head("helper.shutdown", ZERO_CONTEXT).is_some(), true); 871 878 }