refactored futues_core::Future, futures_compat, added race back

Changed files
+154 -130
futures-combinators
+1 -1
futures-combinators/Cargo.toml
··· 11 11 [dependencies] 12 12 futures-core = { workspace = true } 13 13 futures-compat = { workspace = true } 14 + futures-util = { workspace = true } 14 15 lifetime-guard = { workspace = true } 15 - futures = { workspace = true }
+25 -41
futures-combinators/src/join.rs
··· 1 1 use crate::wake::WakeArray; 2 - use futures::future::FusedFuture; 3 - use futures::future::MaybeDone; 4 - use futures::future::maybe_done; 5 - use futures_compat::BespokeFutureWrapper; 2 + use futures_compat::LocalWaker; 3 + use futures_core::FusedFuture; 4 + use futures_util::maybe_done::MaybeDone; 5 + use futures_util::maybe_done::maybe_done; 6 6 use std::pin::Pin; 7 - use std::task::Context; 8 7 use std::task::Poll; 9 8 10 9 /// from [futures-concurrency](https://github.com/yoshuawuyts/futures-concurrency/tree/main) ··· 17 16 type Output; 18 17 19 18 /// The [`ScopedFuture`] implementation returned by this method. 20 - type Future: futures_core::Future<Output = Self::Output>; 19 + type Future: futures_core::Future<LocalWaker, Output = Self::Output>; 21 20 22 21 /// Waits for multiple futures to complete. 23 22 /// ··· 31 30 pub trait JoinExt { 32 31 fn along_with<Fut>(self, other: Fut) -> Join2<Self, Fut> 33 32 where 34 - Self: Sized + futures_core::Future, 35 - Fut: futures_core::Future, 33 + Self: Sized + futures_core::Future<LocalWaker>, 34 + Fut: futures_core::Future<LocalWaker>, 36 35 { 37 36 (self, other).join() 38 37 } 39 38 } 40 39 41 - impl<T> JoinExt for T where T: futures_core::Future {} 40 + impl<T> JoinExt for T where T: futures_core::Future<LocalWaker> {} 42 41 43 42 macro_rules! impl_join_tuple { 44 43 ($namespace:ident $StructName:ident $($F:ident)+) => { ··· 50 49 51 50 #[allow(non_snake_case)] 52 51 #[must_use = "futures do nothing unless you `.await` or poll them"] 53 - pub struct $StructName<$($F: futures_core::Future),+> { 54 - $($F: MaybeDone<BespokeFutureWrapper<$F>>,)* 52 + pub struct $StructName<$($F: futures_core::Future<LocalWaker>),+> { 53 + $($F: MaybeDone<$F>,)* 55 54 wake_array: WakeArray<{$namespace::LEN}>, 56 55 } 57 56 58 - impl<$($F: futures_core::Future),+> futures_core::Future for $StructName<$($F),+> 57 + impl<$($F: futures_core::Future<LocalWaker>),+> futures_core::Future<LocalWaker> for $StructName<$($F),+> 59 58 { 60 59 type Output = ($($F::Output),+); 61 60 62 61 #[allow(non_snake_case)] 63 - fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { 62 + fn poll(self: Pin<&mut Self>, waker: Pin<&LocalWaker>) -> Poll<Self::Output> { 64 63 let this = unsafe { self.get_unchecked_mut() }; 65 64 66 65 let wake_array = unsafe { Pin::new_unchecked(&this.wake_array) }; ··· 70 69 let mut $F = unsafe { Pin::new_unchecked(&mut this.$F) }; 71 70 )+ 72 71 73 - // extract reference to ValueGuard from Context 74 - // this is safe because futures_core::Future are isolated 75 - // from core::future::Future impls and guaranteed to have 76 - // their cx.wakers represented in the nonstandard format 77 - wake_array.register_parent(unsafe { futures_compat::waker_to_guard(cx.waker()) }); 72 + wake_array.register_parent(waker); 78 73 79 74 let mut ready = true; 80 75 81 76 $( 82 77 let index = $namespace::Indexes::$F as usize; 83 - // cx to feed children 84 - let waker = unsafe { futures_compat::guard_to_waker(wake_array.child_guard_ptr(index).unwrap_unchecked()) }; 85 - let mut child_cx = Context::from_waker(&waker); 78 + let waker = unsafe { wake_array.child_guard_ptr(index).unwrap_unchecked() }; 86 79 87 80 // ready if MaybeDone is Done or just completed (converted to Done) 88 81 // unsafe / against Future api contract to poll after Gone/Future is finished 89 82 ready &= if unsafe { dbg!(wake_array.take_woken(index).unwrap_unchecked()) } { 90 - $F.as_mut().poll(&mut child_cx).is_ready() 83 + $F.as_mut().poll(waker).is_ready() 91 84 } else { 92 85 $F.is_terminated() 93 86 }; ··· 111 104 } 112 105 } 113 106 114 - impl<$($F: futures_core::Future),+> Join for ($($F),+) { 107 + impl<$($F: futures_core::Future<LocalWaker>),+> Join for ($($F),+) { 115 108 type Output = ($($F::Output),*); 116 109 type Future = $StructName<$($F),+>; 117 110 ··· 120 113 let ($($F),+) = self; 121 114 122 115 $StructName { 123 - $($F: maybe_done(unsafe { futures_compat::bespoke_future_to_std($F) }),)* 116 + $($F: maybe_done($F),)* 124 117 wake_array: WakeArray::new(), 125 118 } 126 119 } ··· 147 140 use futures_core::{Future, Wake}; 148 141 use lifetime_guard::guard::ValueGuard; 149 142 150 - use crate::wake::{DummyWaker, wake_bespoke_waker}; 143 + use crate::wake::{DummyWaker, local_wake, poll_fn}; 151 144 152 145 use super::*; 153 146 154 - use std::future::poll_fn; 155 147 use std::pin; 156 148 use std::ptr::NonNull; 157 149 ··· 159 151 fn counters() { 160 152 let mut x1 = 0; 161 153 let mut x2 = 0; 162 - let f1 = poll_fn(|cx| { 163 - unsafe { wake_bespoke_waker(cx.waker()) }; 154 + let f1 = poll_fn(|waker| { 155 + local_wake(waker); 164 156 x1 += 1; 165 157 if x1 == 4 { 166 158 Poll::Ready(x1) ··· 168 160 Poll::Pending 169 161 } 170 162 }); 171 - let f2 = poll_fn(|cx| { 172 - unsafe { wake_bespoke_waker(cx.waker()) }; 163 + let f2 = poll_fn(|waker| { 164 + local_wake(waker); 173 165 x2 += 1; 174 166 if x2 == 5 { 175 167 Poll::Ready(x2) ··· 180 172 let guard = pin::pin!(ValueGuard::new(NonNull::new( 181 173 &mut DummyWaker as *mut dyn Wake, 182 174 ))); 183 - let waker = unsafe { futures_compat::guard_to_waker(guard.as_ref()) }; 184 - let mut cx = Context::from_waker(&waker); 185 - let mut join = unsafe { 186 - ( 187 - futures_compat::std_future_to_bespoke(f1), 188 - futures_compat::std_future_to_bespoke(f2), 189 - ) 190 - } 191 - .join(); 175 + let join = (f1, f2).join(); 192 176 let mut pinned = pin::pin!(join); 193 177 for _ in 0..4 { 194 - assert_eq!(pinned.as_mut().poll(&mut cx), Poll::Pending); 178 + assert_eq!(pinned.as_mut().poll(guard.as_ref()), Poll::Pending); 195 179 } 196 - assert_eq!(pinned.poll(&mut cx), Poll::Ready((4, 5))); 180 + assert_eq!(pinned.poll(guard.as_ref()), Poll::Ready((4, 5))); 197 181 } 198 182 199 183 // #[test]
+2 -2
futures-combinators/src/lib.rs
··· 1 1 pub mod join; 2 - // pub mod race; 2 + pub mod race; 3 3 mod wake; 4 4 5 5 use join::*; 6 - // use race::*; 6 + use race::*;
+84 -78
futures-combinators/src/race.rs
··· 1 - use futures_core::{ScopedFuture, Wake}; 2 - use futures_util::WakeStore; 1 + use futures_util::LocalWaker; 2 + 3 + use crate::wake::WakeArray; 4 + use std::pin::Pin; 5 + use std::task::Context; 3 6 use std::{cell::Cell, task::Poll}; 4 7 5 8 /// from [futures-concurrency](https://github.com/yoshuawuyts/futures-concurrency/tree/main) ··· 7 10 /// 8 11 /// Awaits multiple future at once, returning as soon as one completes. The 9 12 /// other futures are cancelled. 10 - pub trait Race<'scope> { 13 + pub trait Race { 11 14 /// The resulting output type. 12 15 type Output; 13 16 14 17 /// The [`ScopedFuture`] implementation returned by this method. 15 - type Future: ScopedFuture<'scope, Output = Self::Output>; 18 + type Future: futures_core::Future<LocalWaker, Output = Self::Output>; 16 19 17 20 /// Wait for the first future to complete. 18 21 /// ··· 24 27 } 25 28 26 29 pub trait RaceExt<'scope> { 27 - fn race_with<Fut>(self, other: Fut) -> Race2<'scope, Self, Fut> 30 + fn race_with<Fut>(self, other: Fut) -> Race2<Self, Fut> 28 31 where 29 - Self: Sized + 'scope + ScopedFuture<'scope>, 30 - Fut: ScopedFuture<'scope> + 'scope, 32 + Self: Sized + futures_core::Future<LocalWaker>, 33 + Fut: futures_core::Future<LocalWaker>, 31 34 { 32 35 (self, other).race() 33 36 } 34 37 } 35 38 36 - impl<'scope, T> RaceExt<'scope> for T where T: ScopedFuture<'scope> {} 39 + impl<'scope, T> RaceExt<'scope> for T where T: futures_core::Future<LocalWaker> {} 37 40 38 41 macro_rules! impl_race_tuple { 39 42 ($namespace:ident $StructName:ident $OutputsName:ident $($F:ident)+) => { 40 43 mod $namespace { 41 - use super::*; 42 - 43 - #[allow(non_snake_case)] 44 - pub struct Wakers<'scope> { 45 - $(pub $F: WakeStore<'scope>,)* 46 - } 47 - 48 - #[allow(non_snake_case)] 49 - pub struct WakerRefs<'scope> { 50 - $(pub $F: Cell<Option<&'scope dyn Wake<'scope>>>,)* 51 - } 44 + #[repr(u8)] 45 + pub(super) enum Indexes { $($F,)+ } 46 + pub(super) const LEN: usize = [$(Indexes::$F,)+].len(); 52 47 } 53 48 54 49 pub enum $OutputsName<$($F,)+> { ··· 77 72 78 73 #[allow(non_snake_case)] 79 74 #[must_use = "futures do nothing unless you `.await` or poll them"] 80 - pub struct $StructName<'scope, $($F: ScopedFuture<'scope>),+> { 75 + pub struct $StructName<$($F: futures_core::Future<LocalWaker>),+> { 81 76 $($F: $F,)* 82 - wakers: $namespace::Wakers<'scope>, 83 - refs: $namespace::WakerRefs<'scope>, 77 + wake_array: WakeArray<{$namespace::LEN}>, 84 78 } 85 79 86 - impl<'scope, $($F: ScopedFuture<'scope> + 'scope),+> ScopedFuture<'scope> 87 - for $StructName<'scope, $($F),+> 80 + impl<'scope, $($F: futures_core::Future<LocalWaker>),+> futures_core::Future<LocalWaker> 81 + for $StructName<$($F),+> 88 82 { 89 83 type Output = $OutputsName<$($F::Output,)+>; 90 84 91 - fn poll(&'scope self, wake: &'scope dyn Wake<'scope>) -> Poll<Self::Output> { 85 + #[allow(non_snake_case)] 86 + fn poll(self: Pin<&mut Self>, waker: Pin<&LocalWaker>) -> Poll<Self::Output> { 87 + let this = unsafe { self.get_unchecked_mut() }; 88 + 89 + let wake_array = unsafe { Pin::new_unchecked(&this.wake_array) }; 92 90 $( 93 - self.wakers.$F.set_parent(wake); 94 - self.refs.$F.replace(Some(&self.wakers.$F)); 91 + let mut $F = unsafe { Pin::new_unchecked(&mut this.$F) }; 92 + )+ 95 93 96 - if self.wakers.$F.take_ready() { 97 - // By polling the future, we create our self-referential structure for lifetime `'scope`. 98 - // 99 - // SAFETY: 100 - // `unwrap_unchecked` is safe because we just inserted `Some` into `refs.$F`, 101 - // so it is guaranteed to be `Some`. 102 - if let Poll::Ready(res) = self.$F.poll(unsafe { (&self.refs.$F.get()).unwrap_unchecked() }) { 94 + wake_array.register_parent(waker); 95 + 96 + $( 97 + let index = $namespace::Indexes::$F as usize; 98 + let waker = unsafe { wake_array.child_guard_ptr(index).unwrap_unchecked() }; 99 + 100 + // this is safe because we know index < LEN 101 + if unsafe { wake_array.take_woken(index).unwrap_unchecked() } { 102 + if let Poll::Ready(res) = $F.as_mut().poll(waker) { 103 103 return Poll::Ready($OutputsName::$F(res)); 104 104 } 105 105 } ··· 109 109 } 110 110 } 111 111 112 - impl<'scope, $($F: ScopedFuture<'scope> + 'scope),+> Race<'scope> for ($($F),+) { 112 + impl<'scope, $($F: futures_core::Future<LocalWaker>),+> Race for ($($F),+) { 113 113 type Output = $OutputsName<$($F::Output),*>; 114 - type Future = $StructName<'scope, $($F),+>; 114 + type Future = $StructName<$($F),+>; 115 115 116 116 #[allow(non_snake_case)] 117 117 fn race(self) -> Self::Future { ··· 119 119 120 120 $StructName { 121 121 $($F: $F,)* 122 - wakers: $namespace::Wakers { 123 - $($F: WakeStore::new(),)* 124 - }, 125 - refs: $namespace::WakerRefs { 126 - $($F: Option::None.into(),)* 127 - }, 122 + wake_array: WakeArray::new(), 128 123 } 129 124 } 130 125 } ··· 147 142 mod tests { 148 143 #![no_std] 149 144 150 - use futures_util::{noop_wake, poll_fn}; 145 + use std::{pin, ptr::NonNull}; 146 + 147 + use futures_core::{Future, Wake}; 148 + use lifetime_guard::guard::ValueGuard; 149 + 150 + use crate::wake::{DummyWaker, local_wake, poll_fn}; 151 151 152 152 use super::*; 153 153 154 154 #[test] 155 155 fn counters() { 156 - let x1 = Cell::new(0); 157 - let x2 = Cell::new(0); 158 - let f1 = poll_fn(|wake| { 159 - wake.wake(); 160 - x1.set(x1.get() + 1); 161 - if x1.get() == 4 { 162 - Poll::Ready(x1.get()) 156 + let mut x1 = 0; 157 + let mut x2 = 0; 158 + let f1 = poll_fn(|waker| { 159 + local_wake(waker); 160 + x1 += 1; 161 + if x1 == 4 { 162 + Poll::Ready(x1) 163 163 } else { 164 164 Poll::Pending 165 165 } 166 166 }); 167 - let f2 = poll_fn(|wake| { 168 - wake.wake(); 169 - x2.set(x2.get() + 1); 170 - if x2.get() == 2 { 171 - Poll::Ready(x2.get()) 167 + let f2 = poll_fn(|waker| { 168 + local_wake(waker); 169 + x2 += 1; 170 + if x2 == 2 { 171 + Poll::Ready(x2) 172 172 } else { 173 173 Poll::Pending 174 174 } 175 175 }); 176 - let dummy_waker = noop_wake(); 176 + let guard = pin::pin!(ValueGuard::new(NonNull::new( 177 + &mut DummyWaker as *mut dyn Wake, 178 + ))); 177 179 let join = (f1, f2).race(); 178 - assert_eq!(join.poll(&dummy_waker), Poll::Pending); 179 - assert_eq!(join.poll(&dummy_waker), Poll::Ready(RaceOutputs2::B(2))); 180 + let mut pinned = pin::pin!(join); 181 + assert_eq!(pinned.as_mut().poll(guard.as_ref()), Poll::Pending); 182 + assert_eq!( 183 + pinned.poll(guard.as_ref()), 184 + Poll::Ready(RaceOutputs2::B(2)) 185 + ); 180 186 } 181 187 182 - #[test] 183 - fn never_wake() { 184 - let f1 = poll_fn(|_| Poll::<i32>::Pending); 185 - let f2 = poll_fn(|_| Poll::<i32>::Pending); 186 - let dummy_waker = noop_wake(); 187 - let join = (f1, f2).race(); 188 - for _ in 0..10 { 189 - assert_eq!(join.poll(&dummy_waker), Poll::Pending); 190 - } 191 - } 188 + // #[test] 189 + // fn never_wake() { 190 + // let f1 = poll_fn(|_| Poll::<i32>::Pending); 191 + // let f2 = poll_fn(|_| Poll::<i32>::Pending); 192 + // let dummy_waker = noop_wake(); 193 + // let join = (f1, f2).race(); 194 + // for _ in 0..10 { 195 + // assert_eq!(join.poll(&dummy_waker), Poll::Pending); 196 + // } 197 + // } 192 198 193 - #[test] 194 - fn basic() { 195 - let f1 = poll_fn(|_| Poll::Ready(1)); 196 - let f2 = poll_fn(|_| Poll::Ready(2)); 197 - let dummy_waker = noop_wake(); 198 - assert_eq!( 199 - f1.race_with(f2).poll(&dummy_waker), 200 - Poll::Ready(RaceOutputs2::A(1)) 201 - ); 202 - } 199 + // #[test] 200 + // fn basic() { 201 + // let f1 = poll_fn(|_| Poll::Ready(1)); 202 + // let f2 = poll_fn(|_| Poll::Ready(2)); 203 + // let dummy_waker = noop_wake(); 204 + // assert_eq!( 205 + // f1.race_with(f2).poll(&dummy_waker), 206 + // Poll::Ready(RaceOutputs2::A(1)) 207 + // ); 208 + // } 203 209 }
+42 -8
futures-combinators/src/wake.rs
··· 1 - use std::{array, cell::Cell, marker::PhantomPinned, pin::Pin, ptr::NonNull}; 1 + use std::{ 2 + array, cell::Cell, marker::PhantomPinned, pin::Pin, ptr::NonNull, 3 + task::Poll, 4 + }; 2 5 3 - use futures_compat::WakePtr; 6 + use futures_compat::{LocalWaker, WakePtr}; 4 7 use futures_core::Wake; 5 8 use lifetime_guard::{guard::RefGuard, guard::ValueGuard}; 6 9 ··· 98 101 } 99 102 } 100 103 101 - pub unsafe fn wake_bespoke_waker(waker: &std::task::Waker) { 102 - unsafe { 103 - let guard = futures_compat::waker_to_guard(waker); 104 - if let Some(wake) = guard.get() { 105 - (*wake.as_ptr()).wake(); 106 - } 104 + pub fn local_wake(guard: &LocalWaker) { 105 + if let Some(wake) = guard.get() { 106 + unsafe { (*wake.as_ptr()).wake() } 107 + } 108 + } 109 + 110 + // pub unsafe fn wake_bespoke_waker(waker: &std::task::Waker) { 111 + // unsafe { 112 + // let guard = futures_compat::waker_to_guard(waker); 113 + // if let Some(wake) = guard.get() { 114 + // (*wake.as_ptr()).wake(); 115 + // } 116 + // } 117 + // } 118 + 119 + pub struct PollFn<F, T>(F) 120 + where 121 + F: FnMut(&LocalWaker) -> Poll<T>; 122 + 123 + impl<F, T> futures_core::Future<LocalWaker> for PollFn<F, T> 124 + where 125 + F: FnMut(&LocalWaker) -> Poll<T>, 126 + { 127 + type Output = T; 128 + 129 + fn poll( 130 + self: Pin<&mut Self>, 131 + waker: Pin<&LocalWaker>, 132 + ) -> Poll<Self::Output> { 133 + (unsafe { &mut self.get_unchecked_mut().0 })(&waker) 107 134 } 135 + } 136 + 137 + pub fn poll_fn<F, T>(f: F) -> impl futures_core::Future<LocalWaker, Output = T> 138 + where 139 + F: FnMut(&LocalWaker) -> Poll<T>, 140 + { 141 + PollFn(f) 108 142 } 109 143 110 144 pub struct DummyWaker;