···8283New async primitives that disallow intra-task concurrency, clone of `futures` and `futures-concurrency` for the new primitives.
8485-## TODO:
86-- [x] ScopedFuture
87-- [ ] static combinators (Join Race etc), see futures-concurrency
88-- [ ] `#[bsync]` or some compiler ScopedFuture generation
89-- [ ] growable combinators (eg. `FutureGroup`, `FuturesUnordered`) (require alloc?)
90-- [ ] unsound (needs `Forget`) multithreading
91-- [ ] "rethinking async rust"
92-- [ ] all of the above for streams
93-- [ ] rfc?
9495channels: need lifetimed receievers, probably needs `Forget` (arc-like channels would be unsafe)
96···141- need tons of interior mutability, since immutable/can't move means `poll` cannot take `&mut self`, cells everywhere
142 - nvm lots of unsafe code, but nothing really unsound
143- potentially bad error messages? stuff like `join!` will have to output code that manually sets up the waker self ref
000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000
···8283New async primitives that disallow intra-task concurrency, clone of `futures` and `futures-concurrency` for the new primitives.
840000000008586channels: need lifetimed receievers, probably needs `Forget` (arc-like channels would be unsafe)
87···132- need tons of interior mutability, since immutable/can't move means `poll` cannot take `&mut self`, cells everywhere
133 - nvm lots of unsafe code, but nothing really unsound
134- potentially bad error messages? stuff like `join!` will have to output code that manually sets up the waker self ref
135+136+## TODO:
137+- [x] ScopedFuture
138+- [x] static combinators (Join Race etc), see futures-concurrency
139+- [x] `#[async_scoped]` or some compiler ScopedFuture generation
140+- [ ] doubly linked list waker registration
141+- [ ] repeating static time reactors - eg. make event poll every N seconds
142+- [ ] io uring reactors
143+- [ ] growable combinators (eg. `FutureGroup`, `FuturesUnordered`) (require alloc?)
144+- [ ] unsound (needs `Forget`) multithreading
145+- [ ] "rethinking async rust"
146+- [ ] all of the above for streams
147+- [ ] rfc?
148+149+# Chapter 3
150+151+man this really sucks
152+153+i need better things to do
154+155+issues from ch 2
156+- works great[*]
157+- *: incompatible with event loop - something still has to poll
158+ - we're back to doubly linked list of waker registration in event loop
159+ - this requires Forget
160+- ScopedFuture - Future interop sucks
161+162+163+structured concurrency with regular combinators:
164+- scope holds tasks
165+- scope cancels tasks when dropped
166+- tasks are ran by central executor
167+168+pure structured concurrency with borrow checker:
169+- high level block_on(), any task wake wakes every level up
170+- tasks have events
171+172+how do the tasks register to an event loop? they don't fuck
173+174+175+```rust
176+struct Task<F: Future> {
177+ inner: F,
178+ prev: *const Task,
179+ next: *const Task,
180+ waker: Waker,
181+}
182+```
183+184+&waker is passed into all sub-tasks, calling wake clone etc panics!!!
185+this is pretty jank
186+187+also waker doesn't have a lifetime so a safe code could easily register to external source that outlives the task
188+this is unsound
189+190+we need
191+```rust
192+struct WakerRegister {
193+ prev: *const WakerRegister,
194+ next: *const WakerRegister,
195+}
196+```
197+198+# Borrow Checked Structured Concurrency
199+200+An async system needs to have the following components:
201+202+- event loop : polls events and schedules tasks when they are ready
203+- tasks : state machines that progress and can await events from the event loop
204+- task combinators : tasks that compose other tasks into useful logic structures
205+206+Tasks will register themselves to events on the event loop, which will need to outlive tasks and register pointers to wake the tasks, so that they can again be polled.
207+This is incompatible with the borrow checker because the task pointers (wakers) are being stored inside an event loop with a lifetime that may exceed the tasks'.
208+209+`Waker` is effectively a `*const dyn Wake`. It is implemented using a custom `RawWakerVTable` rather than a `dyn` ptr to allow for `Wake::wake(self)`, which is not object safe.
210+This method is necessary for runtime implementations that rely on the wakers to be effectively `Arc<Task>`, since `wake(self)` consumes `self` and decrements the reference count.
211+212+There are two types of sound async runtimes that currently exist in rust:
213+214+[tokio](https://github.com/tokio-rs) and [smol](https://github.com/smol-rs) work using the afformentioned reference counting system to ensure wakers aren't dangling pointers to tasks that no longer exist.
215+216+[embassy](https://github.com/embassy-rs/embassy) and [rtic](https://github.com/rtic-rs/rtic) work by ensuring tasks are stored in `static` task pools for `N` tasks. Scheduled tasks are represented by an intrusively linked list to avoid allocation, and wakers can't be dangling pointers because completed tasks will refuse to add themselves back to the linked list, or will be replaced by a new task. This is useful in environments where it is desirable to avoid heap allocation, but requires the user annotate the maximum number of a specific task that can exist at one time, and fails to spawn tasks when they exceed that limit.
217+218+An async runtime where futures are allocated to the stack cannot be sound under this model because `Future::poll` allows any safe `Future` implementation to store or clone wakers wherever they want, which become dangling pointers after the stack allocated future goes out of scope. In order to prevent this, we must have a circular reference, where the task (`Task<'scope>: Wake`) contains a `&'scope Waker` and the `Waker` contains `*const dyn Wake`. For that to be safe, the `Waker` must never be moved. This cannot be possible because something needs to register the waker:
219+220+```rust
221+// waker is moved
222+poll(self: Pin<&mut Self>, cx: &mut Context<'_> /* '_ is the duration of the poll fn */) -> Poll<Self::Output> {
223+ // waker is moved!
224+ // can't register &Waker bc we run into the same problem,
225+ // and the waker only lives for '_
226+ store.register(cx.waker())
227+}
228+```
229+230+Effectively, by saying our waker can't move, we are saying it must be stored by the task, which means it can't be a useful waker. Instead, what we could do is have a waker-register (verb, not noun) that facilitates the binding of an immovable waker to an immovable task, where the waker is guaranteed to outlive the task:
231+232+```rust
233+pub trait Wake<'task> {
234+ fn wake(&self);
235+ fn register_waker(&mut self, waker: &'task Waker);
236+}
237+238+pub struct Waker {
239+ task: *const dyn Wake,
240+ valid: bool, // task is in charge of invalidating when it goes out of scope
241+}
242+243+pub struct WakerRegistration<'poll> {
244+ task: &'poll mut dyn Wake,
245+}
246+247+impl<'poll> WakerRegistration<'poll> {
248+ pub fn register<'task>(self, slot: &'task Waker)
249+ where
250+ 'task: 'poll,
251+ Self: 'task
252+ {
253+ *slot = Waker::new(self.task as *const dyn Wake);
254+ *task.register_waker(slot)
255+ }
256+}
257+```
258+259+This system works better because `WakerRegistration` only lives
260+261+262+263+Experienced rust programmers might be reading this and thinking I am stupid (true) because `Forget`
264+265+An astute (soon to be disappointed) reader might be thinking, as I did when I first learned about this sytem, "what if we ensured that there was only one `Waker` per `Task`, and gave the task a pointer to the waker, so that it could disable the waker when dropped?"
266+267+Unfortunately, there are a multitude of issues with this system
268+269+- In order to hold a pointer to the Waker from the
270+- Preventing a `Waker` from moving means panicking on the
271+272+Even if it was guaranteed that wakers could not be moved or `cloned` (by panicking on `clone`), and registration occured via `&Waker`, the task would still be unable to
273+274+https://conradludgate.com/posts/async-stack
275+276+## Structured Concurrency
277+278+https://trio.discourse.group/t/discussion-notes-on-structured-concurrency-or-go-statement-considered-harmful/25
279+https://kotlinlang.org/docs/coroutines-basics.html
280+281+Notably, the structured concurrency pattern fits very nicely with our hypothetical unsound stack based async runtime.
282+283+## WeakCell Pattern & Forget trait
284+285+https://github.com/rust-lang/rfcs/pull/3782
286+287+288+There are two solutions:
289+290+- `Wakers` panic on `clone()`
291+292+## Waker allocation problem & intra task concurrency
293+294+we can't do intra task concurrency because WeakRegistrations
295+296+
+4-10
futures-compat/src/lib.rs
···13//! that expects it to live for 'scope, and then the ScopedFutureWrapper is
14//! dropped
15//!
16-//!
17-//! ## TRIGGER WARNING
18-//!
19//! This code is not for the faint of heart. Read at your own risk.
2021use std::{
···34/// can transmute between them, but the waker will be completely invalid!
3536/// wraps an internal ScopedFuture, implements Future
37-pub struct ScopedFutureWrapper<'scope, F: ScopedFuture<'scope> + 'scope> {
38 inner: UnsafeCell<F>,
39 marker: PhantomData<&'scope ()>,
40}
···44{
45 type Output = F::Output;
4647- fn poll(
48- self: std::pin::Pin<&mut Self>,
49- cx: &mut std::task::Context<'_>,
50- ) -> Poll<Self::Output> {
51 // # Safety
52 //
53 // Transmutes `Waker` into `&'scope dyn Wake`.
···102 marker: PhantomData<&'scope ()>,
103}
104105-impl<'scope, F: Future> ScopedFuture<'scope>
106 for UnscopedFutureWrapper<'scope, F>
107{
108 type Output = F::Output;
···140 }
141}
142143-impl<'scope, F: Future> UnscopedFutureWrapper<'scope, F> {
144 pub unsafe fn from_future(f: F) -> Self {
145 Self {
146 inner: f.into(),
···13//! that expects it to live for 'scope, and then the ScopedFutureWrapper is
14//! dropped
15//!
00016//! This code is not for the faint of heart. Read at your own risk.
1718use std::{
···31/// can transmute between them, but the waker will be completely invalid!
3233/// wraps an internal ScopedFuture, implements Future
34+pub struct ScopedFutureWrapper<'scope, F: ScopedFuture<'scope>> {
35 inner: UnsafeCell<F>,
36 marker: PhantomData<&'scope ()>,
37}
···41{
42 type Output = F::Output;
4344+ fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
00045 // # Safety
46 //
47 // Transmutes `Waker` into `&'scope dyn Wake`.
···96 marker: PhantomData<&'scope ()>,
97}
9899+impl<'scope, F: Future + 'scope> ScopedFuture<'scope>
100 for UnscopedFutureWrapper<'scope, F>
101{
102 type Output = F::Output;
···134 }
135}
136137+impl<'scope, F: Future + 'scope> UnscopedFutureWrapper<'scope, F> {
138 pub unsafe fn from_future(f: F) -> Self {
139 Self {
140 inner: f.into(),
+3-2
futures-core/src/lib.rs
···35/// The waker is no longer tied to the actual future's lifetime, making it
36/// unsound to not have either static tasks or reference counting.
37/// To avoid this, we want to use a &'scope waker instead, with 1 waker / task.
038#[diagnostic::on_unimplemented(
39- message = "The type `{Self}` must implement the `ScopedFuture` trait.",
40- label = "Missing `ScopedFuture` implementation",
41 note = "If you are trying to await a `task::Future` from within a `ScopedFuture`, note that the systems are incompatible."
42)]
43pub trait ScopedFuture<'scope> {
···35/// The waker is no longer tied to the actual future's lifetime, making it
36/// unsound to not have either static tasks or reference counting.
37/// To avoid this, we want to use a &'scope waker instead, with 1 waker / task.
38+#[must_use = "futures do nothing unless you `.await` or poll them"]
39#[diagnostic::on_unimplemented(
40+ message = "`{Self}` is not a `ScopedFuture`",
41+ label = "`{Self}` is not a `ScopedFuture`",
42 note = "If you are trying to await a `task::Future` from within a `ScopedFuture`, note that the systems are incompatible."
43)]
44pub trait ScopedFuture<'scope> {
+49-5
futures-core/src/task.rs
···1-/// A task that can be woken.
2-///
3-/// This acts as a handle for a reactor to indicate when a `ScopedFuture` is
4-/// once again ready to be polled.
5-pub trait Wake<'scope> {
06 fn wake(&self);
00000000000000000000000000000000000000000007}
···1+use std::{mem, ptr::NonNull, sync::atomic::AtomicPtr};
2+3+// Task: Wake
4+//
5+// Wake must not outlive event loop/storage
6+pub trait Wake<'events> {
7 fn wake(&self);
8+ // task can't outlive event loop -> holds &'events
9+ fn register_waker(&self, waker: &'events Waker);
10+}
11+12+// type Waker<'events> = Option<NonNull<dyn Wake<'events>>>;
13+14+pub struct Waker<'events> {
15+ task: *const dyn Wake<'events>,
16+}
17+18+// wakers must outlive 'task
19+impl<'events> Waker<'events> {
20+ pub fn new(task: *const dyn Wake<'events>) -> Self {
21+ Self { task: task.into() }
22+ }
23+24+ pub fn wake(self) {
25+ unsafe { self.task.as_ref() }.inspect(|task| task.wake());
26+ }
27+}
28+29+pub struct WakerRegistration<'events> {
30+ task: &'events dyn Wake<'events>, // valid for all of 'events
31+}
32+33+impl<'events> WakerRegistration<'events> {
34+ // slot is valid for all 'events
35+ pub fn register(self, slot: &'events mut Waker<'events>) {
36+ // Cast from 'events to 'static
37+ //
38+ // # Safety
39+ //
40+ // This is safe because the drop guard guarantees that the task ptr (which lives for static)
41+ // becomes null when the wake is dropped, ensuring the dangling pointer is never dereferenced.
42+ let dangling_task = unsafe {
43+ mem::transmute::<&'events dyn Wake<'events>, *const dyn Wake<'events>>(
44+ self.task,
45+ )
46+ };
47+ slot.task = dangling_task;
48+49+ (*self.task).register_waker(slot);
50+ }
51}
+107-56
futures-derive/src/lib.rs
···1use proc_macro::TokenStream;
2use quote::{ToTokens, quote};
3use syn::{
4- Expr, ExprAwait, FnArg, GenericArgument, ItemFn, Pat, ReturnType,
5- Signature, parse_macro_input, visit_mut::VisitMut,
6};
700000000000000008#[proc_macro_attribute]
9pub fn async_scoped(_: TokenStream, item: TokenStream) -> TokenStream {
10- let mut input = parse_macro_input!(item as ItemFn);
11-12 // Wraps *every* async expression within the function block with
13 // `ScopedFutureWrapper`, allowing them to be treated as regular `Future`
14 // impls.
···16 // This will cause a compiler error if any expression being awaited is not
17 // a `ScopedFuture`, which is intentional because the `Future` and
18 // `ScopedFuture` systems are incompatible.
19- ScopedFutureWrappingVisitor.visit_item_fn_mut(&mut input);
2021- // Wrap the function with `UnscopedFutureWrapper` to convert it back into
22- // a `ScopedFuture`.
23- wrap_async_with_scoped(&input).into()
000000000000000000000000000024}
2500026/// Takes async fn that returns anonymous `Future` impl.
27/// Generates fn that returns `UnscopedFutureWrapper` wrapper for the the anonymous `Future` impl.
28///
···39///
40/// to actually implement this (capture all lifetimes) we use `ScopedFuture<'_> + '_` so the compiler can infer
41/// lifetimes from the anonymous future impl returned by the actual inner async fn
42-fn wrap_async_with_scoped(
43- ItemFn {
44- attrs,
45- vis,
46- sig:
47- Signature {
48- constness,
49- unsafety,
50- ident,
51- generics,
52- inputs,
53- output,
54- ..
55- },
56- block,
57- }: &ItemFn,
58-) -> proc_macro2::TokenStream {
59- let output = match output {
60- ReturnType::Default => quote! { () },
61- ReturnType::Type(_, ty) => quote! { #ty },
62- };
06364- let inner_args: Vec<syn::Ident> = inputs
65- .iter()
66- .filter_map(|param| match param {
67- FnArg::Receiver(_) => Some(quote::format_ident!("self")),
68- FnArg::Typed(typed) => {
69- if let Pat::Ident(ident) = &*typed.pat {
70- Some(ident.ident.to_owned())
71- } else {
72- None
73- }
74- }
75- })
76- .collect();
7778- let has_lifetime_dependency = inputs.iter().any(|param| match param {
79- FnArg::Receiver(receiver) => receiver.reference.is_some(),
80- FnArg::Typed(pat) => has_lifetime_dependency(&pat.ty),
81- });
0008283- let outer_output = if has_lifetime_dependency {
84- quote! { futures_core::ScopedFuture<'_, Output = #output> + '_ }
85- } else {
86- quote! { futures_core::ScopedFuture<'static, Output = #output> }
87- };
00000000008889- quote! {
90- #(#attrs)* #vis #constness #unsafety fn #ident #generics (#inputs) -> impl #outer_output {
91- async #constness #unsafety fn __inner (#inputs) -> #output #block
9293- let future = __inner(#(#inner_args),*);
9495- unsafe { futures_compat::UnscopedFutureWrapper::from_future(future) }
00096 }
97 }
098}
99100/// Determines if typed pattern contains a reference or dependency on a
···1use proc_macro::TokenStream;
2use quote::{ToTokens, quote};
3use syn::{
4+ Expr, ExprAwait, FnArg, GenericArgument, ItemFn, ReturnType,
5+ parse_macro_input, parse_quote, parse2, visit_mut::VisitMut,
6};
78+/// Takes async fn that returns anonymous `Future` impl.
9+/// Generates fn that returns `UnscopedFutureWrapper` wrapper for the the anonymous `Future` impl.
10+///
11+/// ```rust
12+/// fn my_func<'a, 'b>(a: &'a A, b: &'b B) -> impl ScopedFuture<'a + 'b, Output = T> + 'a + 'b {
13+/// let output = async move { [body] } // compilers turns this into -> impl Future<Output = T> + 'a + 'b
14+/// unsafe { UnscopedFutureWrapper::from_future(output) }
15+/// }
16+/// ```
17+///
18+/// see https://rust-lang.github.io/rfcs/2394-async_await.html#lifetime-capture-in-the-anonymous-future
19+/// for more context on lifetime capture
20+/// - resulting ScopedFuture needs to be constrained to not outlive the lifetimes of any references
21+///
22+/// to actually implement this (capture all lifetimes) we use `ScopedFuture<'_> + '_` so the compiler can infer
23+/// lifetimes from the anonymous future impl returned by the actual inner async fn
24#[proc_macro_attribute]
25pub fn async_scoped(_: TokenStream, item: TokenStream) -> TokenStream {
26+ let mut item_fn = parse_macro_input!(item as ItemFn);
027 // Wraps *every* async expression within the function block with
28 // `ScopedFutureWrapper`, allowing them to be treated as regular `Future`
29 // impls.
···31 // This will cause a compiler error if any expression being awaited is not
32 // a `ScopedFuture`, which is intentional because the `Future` and
33 // `ScopedFuture` systems are incompatible.
34+ ScopedFutureWrappingVisitor.visit_item_fn_mut(&mut item_fn);
3536+ // disable async since it is moved to the block
37+ item_fn.sig.asyncness = None;
38+39+ // wrap block with UnscopedFutureWrapper
40+ let block = *item_fn.block;
41+ *item_fn.block = parse_quote! {
42+ {
43+ let future = async move #block;
44+ unsafe { futures_compat::UnscopedFutureWrapper::from_future(future) }
45+ }
46+ };
47+48+ let output = match &item_fn.sig.output {
49+ ReturnType::Default => quote! { () },
50+ ReturnType::Type(_, ty) => quote! { #ty },
51+ };
52+53+ let has_lifetime_dependency =
54+ item_fn.sig.inputs.iter().any(|param| match param {
55+ FnArg::Receiver(receiver) => receiver.reference.is_some(),
56+ FnArg::Typed(pat) => has_lifetime_dependency(&pat.ty),
57+ });
58+59+ // set outer fn output to ScopedFuture<'_/'static, Output = #output>
60+ item_fn.sig.output = if has_lifetime_dependency {
61+ parse_quote! { -> impl futures_core::ScopedFuture<'_, Output = #output> + '_ }
62+ } else {
63+ parse_quote! { -> impl futures_core::ScopedFuture<'static, Output = #output> }
64+ };
65+66+ item_fn.to_token_stream().into()
67}
6869+/// This currently is impossible to do the `futures_compat` workarounds not
70+/// being compatible with closures.
71+///
72/// Takes async fn that returns anonymous `Future` impl.
73/// Generates fn that returns `UnscopedFutureWrapper` wrapper for the the anonymous `Future` impl.
74///
···85///
86/// to actually implement this (capture all lifetimes) we use `ScopedFuture<'_> + '_` so the compiler can infer
87/// lifetimes from the anonymous future impl returned by the actual inner async fn
88+// #[proc_macro]
89+// pub fn closure(input: TokenStream) -> TokenStream {
90+// // let ExprClosure {
91+// // attrs,
92+// // lifetimes,
93+// // constness,
94+// // movability,
95+// // capture,
96+// // inputs,
97+// // output,
98+// // body,
99+// // ..
100+// // } = parse_macro_input!(input as ExprClosure);
101+// let mut closure = parse_macro_input!(input as ExprClosure);
102+// // disable async because we move it to inner
103+// closure.asyncness = None;
104+// let body = closure.body;
105+106+// // let output = match closure.output {
107+// // ReturnType::Default => parse_quote! { () },
108+// // ReturnType::Type(_, ty) => parse_quote! { #ty },
109+// // };
110111+// // let outer_output =
112+// // parse_quote! { futures_core::ScopedFuture<'_, Output = #output> + '_ };
00000000000113114+// closure.body = parse_quote! {{
115+// let output = async move { #body };
116+// unsafe { futures_compat::UnscopedFutureWrapper::from_future(output) }
117+// }};
118+// // closure.output = outer_output;
119+// closure.to_token_stream().into()
120+// }
121122+/// Wraps a block of optionally async statements and expressions in an anonymous `ScopedFuture` impl.
123+///
124+/// This generates a modified block of the form:
125+///
126+/// ```rust
127+/// {
128+/// let output = async { <original block, mapped to convert all `ScopedFuture` to `Future`> };
129+/// unsafe { futures_compat::UnscopedFutureWrapper::from_future(output) }
130+/// }
131+/// ```
132+#[proc_macro]
133+pub fn block(input: TokenStream) -> TokenStream {
134+ // block is formed { **expr/stmt }, so we need to surround the inputs in {}
135+ let input = proc_macro2::TokenStream::from(input);
136+ let block_input = quote! { { #input } };
137138+ let mut block = parse2(block_input).expect("Failed to parse as block.");
00139140+ ScopedFutureWrappingVisitor.visit_block_mut(&mut block);
141142+ quote! {
143+ {
144+ let output = async #block;
145+ unsafe { futures_compat::UnscopedFutureWrapper::from_future(output) }
146 }
147 }
148+ .into()
149}
150151/// Determines if typed pattern contains a reference or dependency on a
+2-14
futures/src/lib.rs
···1#![no_std]
23pub use futures_combinators;
4-use futures_compat::{ScopedFutureWrapper, UnscopedFutureWrapper};
5pub use futures_core;
6use futures_core::ScopedFuture;
7pub use futures_derive::async_scoped;
···16}
1718#[async_scoped]
19-fn test(a: i32, b: &i32) -> () {
20- // evil().await;
21- let x = inner(a, &b).await;
22- // async {}.await;
23-24- let test_block = futures_derive::block! { 1 + 1; 2 }.await;
25-26- // let test_closure = futures_derive::closure! { |&ab, &cd| ab + cd };
27-28- // let asdf = futures_derive::closure! { |a: &i32| {
29- // *a + b
30- // }};
31- // let x = asdf(&a).await;
32}
3334fn test2<'a>(a: i32) {}