added race combinator

Changed files
+256 -19
futures
src
combinators
futures-combinators
futures-util
+34 -18
futures-combinators/src/join.rs
··· 24 24 fn join(self) -> Self::Future; 25 25 } 26 26 27 - macro_rules! impl_join_tuple { 28 - ($namespace: ident $StructName:ident $($F:ident)+) => { 27 + pub trait JoinExt<'scope> { 28 + fn along_with<Fut>(self, other: Fut) -> Join2<'scope, Self, Fut> 29 + where 30 + Self: Sized + 'scope + ScopedFuture<'scope>, 31 + Fut: ScopedFuture<'scope> + 'scope, 32 + { 33 + (self, other).join() 34 + } 35 + } 36 + 37 + impl<'scope, T> JoinExt<'scope> for T where T: ScopedFuture<'scope> {} 29 38 39 + macro_rules! impl_join_tuple { 40 + ($namespace:ident $StructName:ident $($F:ident)+) => { 30 41 mod $namespace { 31 42 use super::*; 32 43 ··· 35 46 $(pub $F: WakeStore<'scope>,)* 36 47 } 37 48 38 - // this is so stupid 39 49 #[allow(non_snake_case)] 40 50 pub struct WakerRefs<'scope> { 41 51 $(pub $F: Cell<Option<&'scope dyn Wake<'scope>>>,)* ··· 59 69 let mut ready = true; 60 70 61 71 $( 62 - self.wakers.$F.set_parent(wake) ; 72 + self.wakers.$F.set_parent(wake); 63 73 self.refs.$F.replace(Some(&self.wakers.$F)); 64 74 65 75 if !self.$F.is_done() { 66 76 ready &= if self.wakers.$F.take_ready() { 67 - // by polling the future, we create our self referentials truct for lifetime 'scope 68 - // # SAFETY 69 - // unwrap_unchecked is safe because we just put a Some value into our refs.$F 70 - // so it is guaranteed to be Some 71 - self.$F.poll(unsafe { (&self.refs.$F.get()).unwrap_unchecked() }).is_ready() 77 + // By polling the future, we create our self-referential structure for lifetime `'scope`. 78 + // 79 + // SAFETY: 80 + // `unwrap_unchecked` is safe because we just inserted `Some` into `refs.$F`, 81 + // so it is guaranteed to be `Some`. 82 + self.$F 83 + .poll(unsafe { (&self.refs.$F.get()).unwrap_unchecked() }) 84 + .is_ready() 72 85 } else { 73 86 false 74 87 }; ··· 78 91 if ready { 79 92 Poll::Ready(( 80 93 $( 81 - // # SAFETY 82 - // `ready == true` when all futures are already 83 - // complete or just complete. Once not `MaybeDoneState::Future`, futures transition to `MaybeDoneState::Done`. We don't poll them after, or take their outputs so we know the result of `take_output` must be `Some` 94 + // SAFETY: 95 + // `ready == true` when all futures are complete. 96 + // Once a future is not `MaybeDoneState::Future`, it transitions to `Done`, 97 + // so we know the result of `take_output` must be `Some`. 84 98 unsafe { 85 - self.$F 86 - .take_output() 87 - .unwrap_unchecked() 99 + self.$F.take_output().unwrap_unchecked() 88 100 }, 89 101 )* 90 102 )) ··· 104 116 105 117 $StructName { 106 118 $($F: maybe_done($F),)* 107 - wakers: $namespace::Wakers { $($F: WakeStore::new(),)* }, 108 - refs: $namespace::WakerRefs { $($F: Option::None.into(),)* } 119 + wakers: $namespace::Wakers { 120 + $($F: WakeStore::new(),)* 121 + }, 122 + refs: $namespace::WakerRefs { 123 + $($F: Option::None.into(),)* 124 + }, 109 125 } 110 126 } 111 127 } ··· 178 194 let f1 = poll_fn(|_| Poll::Ready(1)); 179 195 let f2 = poll_fn(|_| Poll::Ready(2)); 180 196 let dummy_waker = noop_wake(); 181 - assert_eq!((f1, f2).join().poll(&dummy_waker), Poll::Ready((1, 2))); 197 + assert_eq!(f1.along_with(f2).poll(&dummy_waker), Poll::Ready((1, 2))); 182 198 } 183 199 }
+4
futures-combinators/src/lib.rs
··· 1 1 mod join; 2 + mod race; 3 + 4 + use join::*; 5 + use race::*;
+203
futures-combinators/src/race.rs
··· 1 + use futures_core::{ScopedFuture, Wake}; 2 + use futures_util::WakeStore; 3 + use std::{cell::Cell, task::Poll}; 4 + 5 + /// from [futures-concurrency](https://github.com/yoshuawuyts/futures-concurrency/tree/main) 6 + /// Wait for all futures to complete. 7 + /// 8 + /// Awaits multiple futures simultaneously, returning the output of the futures 9 + /// in the same container type they were created once all complete. 10 + pub trait Race<'scope> { 11 + /// The resulting output type. 12 + type Output; 13 + 14 + /// The [`ScopedFuture`] implementation returned by this method. 15 + type Future: ScopedFuture<'scope, Output = Self::Output>; 16 + 17 + /// Waits for multiple futures to complete. 18 + /// 19 + /// Awaits multiple futures simultaneously, returning the output of the futures 20 + /// in the same container type they we're created once all complete. 21 + /// 22 + /// This function returns a new future which polls all futures concurrently. 23 + fn race(self) -> Self::Future; 24 + } 25 + 26 + pub trait RaceExt<'scope> { 27 + fn race_with<Fut>(self, other: Fut) -> Race2<'scope, Self, Fut> 28 + where 29 + Self: Sized + 'scope + ScopedFuture<'scope>, 30 + Fut: ScopedFuture<'scope> + 'scope, 31 + { 32 + (self, other).race() 33 + } 34 + } 35 + 36 + impl<'scope, T> RaceExt<'scope> for T where T: ScopedFuture<'scope> {} 37 + 38 + macro_rules! impl_race_tuple { 39 + ($namespace:ident $StructName:ident $OutputsName:ident $($F:ident)+) => { 40 + mod $namespace { 41 + use super::*; 42 + 43 + #[allow(non_snake_case)] 44 + pub struct Wakers<'scope> { 45 + $(pub $F: WakeStore<'scope>,)* 46 + } 47 + 48 + #[allow(non_snake_case)] 49 + pub struct WakerRefs<'scope> { 50 + $(pub $F: Cell<Option<&'scope dyn Wake<'scope>>>,)* 51 + } 52 + } 53 + 54 + pub enum $OutputsName<$($F,)+> { 55 + $($F($F),)+ 56 + } 57 + 58 + impl<$($F: std::fmt::Debug,)+> std::fmt::Debug for $OutputsName<$($F,)+> { 59 + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { 60 + match self {$( 61 + Self::$F(x) => 62 + f.debug_tuple(std::stringify!($F)) 63 + .field(x) 64 + .finish(), 65 + )+} 66 + } 67 + } 68 + 69 + impl<$($F: PartialEq,)+> PartialEq for $OutputsName<$($F,)+> { 70 + fn eq(&self, other: &Self) -> bool { 71 + match (self, other) { 72 + $((Self::$F(a), Self::$F(b)) => a == b,)+ 73 + _ => false 74 + } 75 + } 76 + } 77 + 78 + #[allow(non_snake_case)] 79 + #[must_use = "futures do nothing unless you `.await` or poll them"] 80 + pub struct $StructName<'scope, $($F: ScopedFuture<'scope>),+> { 81 + $($F: $F,)* 82 + wakers: $namespace::Wakers<'scope>, 83 + refs: $namespace::WakerRefs<'scope>, 84 + } 85 + 86 + impl<'scope, $($F: ScopedFuture<'scope> + 'scope),+> ScopedFuture<'scope> 87 + for $StructName<'scope, $($F),+> 88 + { 89 + type Output = $OutputsName<$($F::Output,)+>; 90 + 91 + fn poll(&'scope self, wake: &'scope dyn Wake<'scope>) -> Poll<Self::Output> { 92 + $( 93 + self.wakers.$F.set_parent(wake); 94 + self.refs.$F.replace(Some(&self.wakers.$F)); 95 + 96 + if self.wakers.$F.take_ready() { 97 + // By polling the future, we create our self-referential structure for lifetime `'scope`. 98 + // 99 + // SAFETY: 100 + // `unwrap_unchecked` is safe because we just inserted `Some` into `refs.$F`, 101 + // so it is guaranteed to be `Some`. 102 + if let Poll::Ready(res) = self.$F.poll(unsafe { (&self.refs.$F.get()).unwrap_unchecked() }) { 103 + return Poll::Ready($OutputsName::$F(res)); 104 + } 105 + } 106 + )+ 107 + 108 + Poll::Pending 109 + } 110 + } 111 + 112 + impl<'scope, $($F: ScopedFuture<'scope> + 'scope),+> Race<'scope> for ($($F),+) { 113 + type Output = $OutputsName<$($F::Output),*>; 114 + type Future = $StructName<'scope, $($F),+>; 115 + 116 + #[allow(non_snake_case)] 117 + fn race(self) -> Self::Future { 118 + let ($($F),+) = self; 119 + 120 + $StructName { 121 + $($F: $F,)* 122 + wakers: $namespace::Wakers { 123 + $($F: WakeStore::new(),)* 124 + }, 125 + refs: $namespace::WakerRefs { 126 + $($F: Option::None.into(),)* 127 + }, 128 + } 129 + } 130 + } 131 + }; 132 + } 133 + 134 + impl_race_tuple!(race2 Race2 RaceOutputs2 A B); 135 + impl_race_tuple!(race3 Race3 RaceOutputs3 A B C); 136 + impl_race_tuple!(race4 Race4 RaceOutputs4 A B C D); 137 + impl_race_tuple!(race5 Race5 RaceOutputs5 A B C D E); 138 + impl_race_tuple!(race6 Race6 RaceOutputs6 A B C D E F); 139 + impl_race_tuple!(race7 Race7 RaceOutputs7 A B C D E F G); 140 + impl_race_tuple!(race8 Race8 RaceOutputs8 A B C D E F G H); 141 + impl_race_tuple!(race9 Race9 RaceOutputs9 A B C D E F G H I); 142 + impl_race_tuple!(race10 Race10 RaceOutputs10 A B C D E F G H I J); 143 + impl_race_tuple!(race11 Race11 RaceOutputs11 A B C D E F G H I J K); 144 + impl_race_tuple!(race12 Race12 RaceOutputs12 A B C D E F G H I J K L); 145 + 146 + #[cfg(test)] 147 + mod tests { 148 + #![no_std] 149 + 150 + use futures_util::{noop_wake, poll_fn}; 151 + 152 + use super::*; 153 + 154 + #[test] 155 + fn counters() { 156 + let x1 = Cell::new(0); 157 + let x2 = Cell::new(0); 158 + let f1 = poll_fn(|wake| { 159 + wake.wake(); 160 + x1.set(x1.get() + 1); 161 + if x1.get() == 4 { 162 + Poll::Ready(x1.get()) 163 + } else { 164 + Poll::Pending 165 + } 166 + }); 167 + let f2 = poll_fn(|wake| { 168 + wake.wake(); 169 + x2.set(x2.get() + 1); 170 + if x2.get() == 2 { 171 + Poll::Ready(x2.get()) 172 + } else { 173 + Poll::Pending 174 + } 175 + }); 176 + let dummy_waker = noop_wake(); 177 + let join = (f1, f2).race(); 178 + assert_eq!(join.poll(&dummy_waker), Poll::Pending); 179 + assert_eq!(join.poll(&dummy_waker), Poll::Ready(RaceOutputs2::B(2))); 180 + } 181 + 182 + #[test] 183 + fn never_wake() { 184 + let f1 = poll_fn(|_| Poll::<i32>::Pending); 185 + let f2 = poll_fn(|_| Poll::<i32>::Pending); 186 + let dummy_waker = noop_wake(); 187 + let join = (f1, f2).race(); 188 + for _ in 0..10 { 189 + assert_eq!(join.poll(&dummy_waker), Poll::Pending); 190 + } 191 + } 192 + 193 + #[test] 194 + fn basic() { 195 + let f1 = poll_fn(|_| Poll::Ready(1)); 196 + let f2 = poll_fn(|_| Poll::Ready(2)); 197 + let dummy_waker = noop_wake(); 198 + assert_eq!( 199 + f1.race_with(f2).poll(&dummy_waker), 200 + Poll::Ready(RaceOutputs2::A(1)) 201 + ); 202 + } 203 + }
+14
futures-util/src/ext.rs
··· 1 + pub trait FutureExt: Future { 2 + fn along_with(self, other: impl Future) 3 + where 4 + Self: Sized, 5 + { 6 + // Join2 7 + } 8 + // fn then(self, other: impl Future) 9 + // where 10 + // Self: Sized, 11 + // { 12 + 13 + // } 14 + }
+1
futures-util/src/lib.rs
··· 1 + mod ext; 1 2 mod maybe_done; 2 3 mod poll_fn; 3 4 mod wakers;
-1
futures/src/combinators/mod.rs
··· 1 - mod join;