wip

surely this isn't working

+90 -78
+75 -69
futures/src/combinators/join.rs
··· 2 2 future::{ScopedFuture, Wake}, 3 3 utils::{MaybeDone, maybe_done}, 4 4 }; 5 - use std::pin::Pin; 6 - use std::task::Poll; 5 + use std::mem; 6 + use std::{pin::Pin, sync::atomic::Ordering}; 7 + use std::{sync::atomic::AtomicBool, task::Poll}; 7 8 8 9 /// from yoshuawuyts/futures-concurrency 9 10 /// Wait for all futures to complete. ··· 69 70 // https://github.com/yoshuawuyts/futures-concurrency/blob/main/src/utils/wakers/array/waker.rs 70 71 // possibly copy large portions of futures-concurrency over here 71 72 72 - /// implements unsafe logic for a set of wakers waking one waker 73 - pub struct WakerArray<'scope, const N: usize> { 74 - parent_waker: Option<Wake<'scope>>, 75 - // TODO bit packing 76 - child_readiness: [bool; N], 77 - child_wakers: [Wake<'scope>; N], 78 - } 73 + // struct Waker<'scope> { 74 + // parent_waker: Wake<'scope>, 75 + // } 76 + 77 + // impl<'scope> Waker<'scope> { 78 + // fn wake(&mut self) { 79 + // (*self.parent_waker)(); 80 + // } 81 + // } 79 82 80 - impl<'scope, const N: usize> WakerArray<'scope, N> { 81 - fn new() -> Self { 82 - // let mut this = Self { 83 - // parent_waker: None, 84 - // child_readiness: [false; N], 85 - // child_wakers: [|| {}; N], 86 - // }; 87 - // this.child_wakers = [|x| 2 * x; N]; 88 - // this 89 - } 90 - } 83 + // /// implements unsafe logic for a set of wakers waking one waker 84 + // pub struct WakerArray<'scope, const N: usize> { 85 + // parent_waker: Option<&'scope dyn Wake<'scope>>, 86 + // // TODO bit packing 87 + // child_readiness: [bool; N], 88 + // pub child_wakers: Option<[Waker<'scope>; N]>, 89 + // } 90 + 91 + // impl<'scope, const N: usize> WakerArray<'scope, N> { 92 + // fn new() -> Self { 93 + // Self { 94 + // parent_waker: None, 95 + // child_readiness: [false; N], 96 + // child_wakers: None, 97 + // } 98 + // } 99 + 100 + // fn register_parent_wake(&mut self, wake: Wake<'scope>) { 101 + // self.parent_waker = Some(wake); 102 + // self.child_wakers = Some( 103 + // [Waker { 104 + // parent_waker: &self.parent_waker, 105 + // }; N], 106 + // ); 107 + // } 108 + // } 91 109 92 110 // would be rly nice if rust had java functional interfaces for wake(&mut Self) 93 111 94 - // TODO bit packing 95 112 struct WakeStore<'scope> { 96 - ready: bool, 97 - parent: Wake<'scope>, 113 + // no extra storage bc None is 0x000 ptr 114 + parent: Option<&'scope dyn Wake<'scope>>, 115 + ready: AtomicBool, 98 116 } 99 117 100 118 impl<'scope> WakeStore<'scope> { 101 - fn new(parent: Wake<'scope>) -> Self { 119 + fn new() -> Self { 102 120 Self { 103 - parent, 104 - ready: true, 121 + parent: Option::None, 122 + ready: true.into(), 105 123 } 106 124 } 107 125 108 126 fn take_ready(&mut self) -> bool { 109 - let out = self.ready; 110 - self.ready = false; 111 - out 127 + self.ready.swap(false, Ordering::SeqCst) 112 128 } 113 129 } 114 130 115 - // impl ScopedWake for WakeStore<'_> { 116 - // fn wake(&mut self) { 117 - // self.ready = true; 118 - // self.parent.wake(); 119 - // } 120 - // } 131 + impl<'scope> Wake<'scope> for WakeStore<'scope> { 132 + fn wake(&self) { 133 + self.ready.swap(true, Ordering::SeqCst); 134 + if let Some(parent) = self.parent { 135 + parent.wake(); 136 + } 137 + } 138 + } 121 139 122 140 // heavily based on https://github.com/yoshuawuyts/futures-concurrency 123 141 macro_rules! impl_join_tuple { ··· 128 146 // future_$F and waker_$F 129 147 #[allow(non_snake_case)] 130 148 struct Wakers<'scope> { 149 + // inefficient, needs tt muncher for actual [T; LEN] traversal, fewer cache misses 131 150 $($F: WakeStore<'scope>,)* 132 151 } 133 152 134 153 #[allow(non_snake_case)] 135 154 pub struct $StructName<'scope, $($F: ScopedFuture<'scope>),+> { 136 - parent_waker: Option<Wake<'scope>>, 155 + // parent_waker: Option<&'scope dyn Wake>, 137 156 $($F: MaybeDone<'scope, $F>,)* 138 157 wakers: Wakers<'scope>, 139 158 } 140 159 141 - impl<'scope, $($F: ScopedFuture<'scope>),+> ScopedFuture<'scope> for $StructName<'scope, $($F),+> { 160 + impl<'scope, $($F: ScopedFuture<'scope> + 'scope),+> ScopedFuture<'scope> for $StructName<'scope, $($F),+> 161 + { 142 162 type Output = ($($F::Output),+); 143 163 144 - fn register_wake(self: Pin<&mut Self>, waker: Wake<'scope>) { 145 - unsafe { self.get_unchecked_mut() }.parent_waker = Some(waker); 146 - } 147 - 148 - fn poll(self: Pin<&mut Self>) -> Poll<Self::Output> { 164 + fn poll(self: Pin<&mut Self>, wake: &'scope dyn Wake<'scope>) -> Poll<Self::Output> 165 + { 149 166 let this = unsafe { self.get_unchecked_mut() }; 150 167 151 168 let mut ready = true; 152 169 153 - // "loop" through all ready futures, poll if ready 154 - // 155 - // this combinator is complete when all internal futures have 156 - // polled to completion 157 170 $( 158 - // let $F = unsafe { &mut self.map_unchecked_mut(|f| &mut f.$F) }; 171 + this.wakers.$F.parent = Some(wake); 172 + 159 173 if let MaybeDone::Future(fut) = &mut this.$F { 160 174 ready &= if this.wakers.$F.take_ready() { 161 - unsafe { Pin::new_unchecked(fut) }.poll().is_ready() 175 + unsafe { 176 + Pin::new_unchecked(fut).poll(mem::transmute(&this.wakers.$F as &dyn Wake)).is_ready() 177 + } 162 178 } else { 163 179 false 164 180 }; ··· 167 183 168 184 if ready { 169 185 Poll::Ready(($( 170 - // unwrap_unchecked is safe here because we know all 171 - // futures have been polled to completion 172 - // (`MaybeDone::Done`) and have never been converted 173 - // to `MaybeDone::Gone` 174 - unsafe { Pin::new_unchecked(&mut this.$F).take_output().unwrap_unchecked() }, 186 + // unwrap_unchecked is safe here because we know all 187 + // futures have been polled to completion 188 + // (`MaybeDone::Done`) and have never been converted 189 + // to `MaybeDone::Gone` 190 + unsafe { Pin::new_unchecked(&mut this.$F).take_output().unwrap_unchecked() }, 175 191 )*)) 176 192 } else { 177 193 Poll::Pending ··· 180 196 } 181 197 } 182 198 183 - // impl<'scope, $($F: ScopedFuture<'scope>),+> Wake<'scope> for $StructName<'scope, $($F),+> { 184 - // fn wake(&mut self) { 185 - // if let Some(waker) = &mut self.parent_waker { 186 - // waker.wake(); 187 - // }; 188 - // } 189 - // } 190 - 191 - impl<'scope, $($F: ScopedFuture<'scope>),+> Join<'scope> for ($($F),+) { 199 + impl<'scope, $($F: ScopedFuture<'scope> + 'scope),+> Join<'scope> for ($($F),+) { 192 200 type Output = ($($F::Output),*); 193 201 type Future = $StructName<'scope, $($F),+>; 194 202 195 203 #[allow(non_snake_case)] 196 204 fn join(self) -> Self::Future { 197 205 let ($($F),+): ($($F),+) = self; 198 - // $StructName { 199 - // parent_waker: Option::None, 200 - // $($F: maybe_done($F),)* 201 - // wakers: $(Wakers { $F: WakeStore::new(&mut Self) }),* 202 - // } 203 - todo!() 204 - // TODO register all wakers 206 + $StructName { 207 + // parent_waker: Option::None, 208 + $($F: maybe_done($F),)* 209 + wakers: Wakers { $($F: WakeStore::new(),)* } 210 + } 205 211 } 206 212 } 207 213 };
+9 -6
futures/src/future.rs
··· 4 4 /// 5 5 /// This acts as a handle for a reactor to indicate when a `ScopedFuture` is 6 6 /// once again ready to be polled. 7 - pub type Wake<'scope> = &'scope dyn FnMut() -> (); 7 + pub trait Wake<'scope> { 8 + fn wake(&self); 9 + } 8 10 9 11 /// ScopedFuture represents a unit of asynchronous computation that must be 10 12 /// polled by an external actor. ··· 42 44 /// If waker is ownable/cloneable, that erases the lifetime's importance. 43 45 /// If the waker is a non clonable mutable reference that lives for 'scope, 44 46 /// it cannot be passed into `poll` every time the future is polled, instead it 45 - /// must only be registered once. 47 + /// must only be registered once, leading to a register_waker api that is very 48 + /// cumbersome without unsafe poll/unsafe register_waker. Instead, it's easier 49 + /// to use a non clonable immutable reference and have waking occur via 50 + /// interior mutability (this is fine since combinators rely on interior 51 + /// mutability anyway for a 1 parent : many children waker relationship) 46 52 pub trait ScopedFuture<'scope> { 47 53 type Output; 48 54 49 - // these could be made unsafe to guarantee polling to completion? 50 - fn register_wake(self: Pin<&mut Self>, wake: Wake<'scope>); 51 - 52 - fn poll(self: Pin<&mut Self>) -> Poll<Self::Output>; 55 + fn poll(self: Pin<&mut Self>, wake: &'scope dyn Wake<'scope>) -> Poll<Self::Output>; 53 56 }
+6 -3
futures/src/utils/maybe_done.rs
··· 1 1 //! Definition of the MaybeDone combinator 2 2 3 - use crate::future::{ScopedFuture, ScopedWake}; 3 + use crate::future::{ScopedFuture, Wake}; 4 4 5 5 use super::assert_future; 6 6 use core::mem; 7 7 use core::pin::Pin; 8 - use std::task::Poll; 8 + use std::task::{Poll, ready}; 9 9 10 10 /// A future that may have completed. 11 11 /// ··· 89 89 impl<'scope, Fut: ScopedFuture<'scope>> ScopedFuture<'scope> for MaybeDone<'scope, Fut> { 90 90 type Output = (); 91 91 92 - fn poll(mut self: Pin<&mut Self>, cx: &'scope mut dyn ScopedWake) -> Poll<Self::Output> { 92 + fn poll(mut self: Pin<&mut Self>, cx: &'scope dyn Wake<'scope>) -> Poll<Self::Output> 93 + // where 94 + // 'scope: 'react, 95 + { 93 96 unsafe { 94 97 match self.as_mut().get_unchecked_mut() { 95 98 Self::Future(f) => {