+1
.gitignore
+1
.gitignore
+20
-74
futures/src/combinators/join.rs
+20
-74
futures/src/combinators/join.rs
···
5
use std::mem;
6
use std::{pin::Pin, sync::atomic::Ordering};
7
use std::{sync::atomic::AtomicBool, task::Poll};
8
-
9
/// from yoshuawuyts/futures-concurrency
10
/// Wait for all futures to complete.
11
///
···
14
pub trait Join<'scope> {
15
/// The resulting output type.
16
type Output;
17
-
18
/// The [`Future`] implementation returned by this method.
19
type Future: ScopedFuture<'scope, Output = Self::Output>;
20
-
21
/// Waits for multiple futures to complete.
22
///
23
/// Awaits multiple futures simultaneously, returning the output of the futures
···
65
/// This function returns a new future which polls all futures concurrently.
66
fn join(self) -> Self::Future;
67
}
68
-
69
-
// "look at what they need for a fraction of our power" (more efficient join impl is regular join here)
70
-
// https://github.com/yoshuawuyts/futures-concurrency/blob/main/src/utils/wakers/array/waker.rs
71
-
// possibly copy large portions of futures-concurrency over here
72
-
73
-
// struct Waker<'scope> {
74
-
// parent_waker: Wake<'scope>,
75
-
// }
76
-
77
-
// impl<'scope> Waker<'scope> {
78
-
// fn wake(&mut self) {
79
-
// (*self.parent_waker)();
80
-
// }
81
-
// }
82
-
83
-
// /// implements unsafe logic for a set of wakers waking one waker
84
-
// pub struct WakerArray<'scope, const N: usize> {
85
-
// parent_waker: Option<&'scope dyn Wake<'scope>>,
86
-
// // TODO bit packing
87
-
// child_readiness: [bool; N],
88
-
// pub child_wakers: Option<[Waker<'scope>; N]>,
89
-
// }
90
-
91
-
// impl<'scope, const N: usize> WakerArray<'scope, N> {
92
-
// fn new() -> Self {
93
-
// Self {
94
-
// parent_waker: None,
95
-
// child_readiness: [false; N],
96
-
// child_wakers: None,
97
-
// }
98
-
// }
99
-
100
-
// fn register_parent_wake(&mut self, wake: Wake<'scope>) {
101
-
// self.parent_waker = Some(wake);
102
-
// self.child_wakers = Some(
103
-
// [Waker {
104
-
// parent_waker: &self.parent_waker,
105
-
// }; N],
106
-
// );
107
-
// }
108
-
// }
109
-
110
-
// would be rly nice if rust had java functional interfaces for wake(&mut Self)
111
-
112
struct WakeStore<'scope> {
113
-
// no extra storage bc None is 0x000 ptr
114
parent: Option<&'scope dyn Wake<'scope>>,
115
ready: AtomicBool,
116
}
117
-
118
impl<'scope> WakeStore<'scope> {
119
fn new() -> Self {
120
Self {
···
122
ready: true.into(),
123
}
124
}
125
-
126
fn take_ready(&mut self) -> bool {
127
self.ready.swap(false, Ordering::SeqCst)
128
}
129
}
130
-
131
impl<'scope> Wake<'scope> for WakeStore<'scope> {
132
fn wake(&self) {
133
self.ready.swap(true, Ordering::SeqCst);
···
137
}
138
}
139
140
-
// heavily based on https://github.com/yoshuawuyts/futures-concurrency
141
macro_rules! impl_join_tuple {
142
($StructName:ident $($F:ident)+) => {
143
-
144
-
// this exists to work around concatenating idents
145
-
// once https://doc.rust-lang.org/stable/unstable-book/language-features/macro-metavar-expr-concat.html is stable, the $StructName can just contain
146
-
// future_$F and waker_$F
147
#[allow(non_snake_case)]
148
struct Wakers<'scope> {
149
-
// inefficient, needs tt muncher for actual [T; LEN] traversal, fewer cache misses
150
$($F: WakeStore<'scope>,)*
151
}
152
153
#[allow(non_snake_case)]
154
pub struct $StructName<'scope, $($F: ScopedFuture<'scope>),+> {
155
-
// parent_waker: Option<&'scope dyn Wake>,
156
$($F: MaybeDone<'scope, $F>,)*
157
wakers: Wakers<'scope>,
158
}
159
160
-
impl<'scope, $($F: ScopedFuture<'scope> + 'scope),+> ScopedFuture<'scope> for $StructName<'scope, $($F),+>
161
{
162
type Output = ($($F::Output),+);
163
164
-
fn poll(self: Pin<&mut Self>, wake: &'scope dyn Wake<'scope>) -> Poll<Self::Output>
165
-
{
166
let this = unsafe { self.get_unchecked_mut() };
167
-
168
let mut ready = true;
169
170
$(
···
173
if let MaybeDone::Future(fut) = &mut this.$F {
174
ready &= if this.wakers.$F.take_ready() {
175
unsafe {
176
-
Pin::new_unchecked(fut).poll(mem::transmute(&this.wakers.$F as &dyn Wake)).is_ready()
177
}
178
} else {
179
false
···
182
)+
183
184
if ready {
185
-
Poll::Ready(($(
186
-
// unwrap_unchecked is safe here because we know all
187
-
// futures have been polled to completion
188
-
// (`MaybeDone::Done`) and have never been converted
189
-
// to `MaybeDone::Gone`
190
-
unsafe { Pin::new_unchecked(&mut this.$F).take_output().unwrap_unchecked() },
191
-
)*))
192
} else {
193
Poll::Pending
194
}
195
-
196
}
197
}
198
···
202
203
#[allow(non_snake_case)]
204
fn join(self) -> Self::Future {
205
-
let ($($F),+): ($($F),+) = self;
206
$StructName {
207
-
// parent_waker: Option::None,
208
$($F: maybe_done($F),)*
209
-
wakers: Wakers { $($F: WakeStore::new(),)* }
210
}
211
}
212
}
···
5
use std::mem;
6
use std::{pin::Pin, sync::atomic::Ordering};
7
use std::{sync::atomic::AtomicBool, task::Poll};
8
/// from yoshuawuyts/futures-concurrency
9
/// Wait for all futures to complete.
10
///
···
13
pub trait Join<'scope> {
14
/// The resulting output type.
15
type Output;
16
/// The [`Future`] implementation returned by this method.
17
type Future: ScopedFuture<'scope, Output = Self::Output>;
18
/// Waits for multiple futures to complete.
19
///
20
/// Awaits multiple futures simultaneously, returning the output of the futures
···
62
/// This function returns a new future which polls all futures concurrently.
63
fn join(self) -> Self::Future;
64
}
65
struct WakeStore<'scope> {
66
parent: Option<&'scope dyn Wake<'scope>>,
67
ready: AtomicBool,
68
}
69
impl<'scope> WakeStore<'scope> {
70
fn new() -> Self {
71
Self {
···
73
ready: true.into(),
74
}
75
}
76
fn take_ready(&mut self) -> bool {
77
self.ready.swap(false, Ordering::SeqCst)
78
}
79
}
80
impl<'scope> Wake<'scope> for WakeStore<'scope> {
81
fn wake(&self) {
82
self.ready.swap(true, Ordering::SeqCst);
···
86
}
87
}
88
89
macro_rules! impl_join_tuple {
90
($StructName:ident $($F:ident)+) => {
91
#[allow(non_snake_case)]
92
struct Wakers<'scope> {
93
$($F: WakeStore<'scope>,)*
94
}
95
96
#[allow(non_snake_case)]
97
pub struct $StructName<'scope, $($F: ScopedFuture<'scope>),+> {
98
$($F: MaybeDone<'scope, $F>,)*
99
wakers: Wakers<'scope>,
100
}
101
102
+
impl<'scope, $($F: ScopedFuture<'scope> + 'scope),+> ScopedFuture<'scope>
103
+
for $StructName<'scope, $($F),+>
104
{
105
type Output = ($($F::Output),+);
106
107
+
fn poll(self: Pin<&mut Self>, wake: &'scope dyn Wake<'scope>) -> Poll<Self::Output> {
108
let this = unsafe { self.get_unchecked_mut() };
109
let mut ready = true;
110
111
$(
···
114
if let MaybeDone::Future(fut) = &mut this.$F {
115
ready &= if this.wakers.$F.take_ready() {
116
unsafe {
117
+
Pin::new_unchecked(fut).poll(
118
+
mem::transmute::<&dyn Wake<'scope>, &'scope dyn Wake<'scope>>(
119
+
&this.wakers.$F
120
+
)
121
+
).is_ready()
122
}
123
} else {
124
false
···
127
)+
128
129
if ready {
130
+
Poll::Ready((
131
+
$(
132
+
unsafe {
133
+
Pin::new_unchecked(&mut this.$F)
134
+
.take_output()
135
+
.unwrap_unchecked()
136
+
},
137
+
)*
138
+
))
139
} else {
140
Poll::Pending
141
}
142
}
143
}
144
···
148
149
#[allow(non_snake_case)]
150
fn join(self) -> Self::Future {
151
+
let ($($F),+) = self;
152
+
153
$StructName {
154
$($F: maybe_done($F),)*
155
+
wakers: Wakers { $($F: WakeStore::new(),)* },
156
}
157
}
158
}
+4
-1
futures/src/future.rs
+4
-1
futures/src/future.rs
-2
futures/src/lib.rs
-2
futures/src/lib.rs
+14
-4
futures/src/utils/maybe_done.rs
+14
-4
futures/src/utils/maybe_done.rs
···
21
Gone,
22
}
23
24
-
impl<'scope, Fut: ScopedFuture<'scope> + Unpin> Unpin for MaybeDone<'scope, Fut> {}
25
26
/// Wraps a future into a `MaybeDone`
27
///
···
41
/// assert_eq!(future.as_mut().take_output(), None);
42
/// # });
43
/// ```
44
-
pub fn maybe_done<'scope, Fut: ScopedFuture<'scope>>(future: Fut) -> MaybeDone<'scope, Fut> {
45
assert_future::<(), _>(MaybeDone::Future(future))
46
}
47
···
86
// }
87
// }
88
89
-
impl<'scope, Fut: ScopedFuture<'scope>> ScopedFuture<'scope> for MaybeDone<'scope, Fut> {
90
type Output = ();
91
92
-
fn poll(mut self: Pin<&mut Self>, cx: &'scope dyn Wake<'scope>) -> Poll<Self::Output>
93
// where
94
// 'scope: 'react,
95
{
···
21
Gone,
22
}
23
24
+
impl<'scope, Fut: ScopedFuture<'scope> + Unpin> Unpin
25
+
for MaybeDone<'scope, Fut>
26
+
{
27
+
}
28
29
/// Wraps a future into a `MaybeDone`
30
///
···
44
/// assert_eq!(future.as_mut().take_output(), None);
45
/// # });
46
/// ```
47
+
pub fn maybe_done<'scope, Fut: ScopedFuture<'scope>>(
48
+
future: Fut,
49
+
) -> MaybeDone<'scope, Fut> {
50
assert_future::<(), _>(MaybeDone::Future(future))
51
}
52
···
91
// }
92
// }
93
94
+
impl<'scope, Fut: ScopedFuture<'scope>> ScopedFuture<'scope>
95
+
for MaybeDone<'scope, Fut>
96
+
{
97
type Output = ();
98
99
+
fn poll(
100
+
mut self: Pin<&mut Self>,
101
+
cx: &'scope dyn Wake<'scope>,
102
+
) -> Poll<Self::Output>
103
// where
104
// 'scope: 'react,
105
{
+1
rustfmt.toml
+1
rustfmt.toml
···
···
1
+
max_width = 80