+23
-46
futures-combinators/src/join.rs
+23
-46
futures-combinators/src/join.rs
···
1
use futures_core::{ScopedFuture, Wake};
2
use futures_util::{MaybeDone, MaybeDoneState, maybe_done};
3
-
use std::cell::{Cell, UnsafeCell};
4
-
use std::sync::atomic::Ordering;
5
-
use std::{sync::atomic::AtomicBool, task::Poll};
6
7
-
/// from yoshuawuyts/futures-concurrency
8
/// Wait for all futures to complete.
9
///
10
/// Awaits multiple futures simultaneously, returning the output of the futures
···
12
pub trait Join<'scope> {
13
/// The resulting output type.
14
type Output;
15
/// The [`ScopedFuture`] implementation returned by this method.
16
type Future: ScopedFuture<'scope, Output = Self::Output>;
17
/// Waits for multiple futures to complete.
18
///
19
/// Awaits multiple futures simultaneously, returning the output of the futures
20
/// in the same container type they we're created once all complete.
21
///
22
-
/// # Examples
23
-
///
24
-
/// Awaiting multiple futures of the same type can be done using either a vector
25
-
/// or an array.
26
-
/// ```rust
27
-
/// # futures::executor::block_on(async {
28
-
/// use futures_concurrency::prelude::*;
29
-
///
30
-
/// // all futures passed here are of the same type
31
-
/// let fut1 = core::future::ready(1);
32
-
/// let fut2 = core::future::ready(2);
33
-
/// let fut3 = core::future::ready(3);
34
-
///
35
-
/// let outputs = [fut1, fut2, fut3].join().await;
36
-
/// assert_eq!(outputs, [1, 2, 3]);
37
-
/// # })
38
-
/// ```
39
-
///
40
-
/// In practice however, it's common to want to await multiple futures of
41
-
/// different types. For example if you have two different `async {}` blocks,
42
-
/// you want to `.await`. To do that, you can call `.join` on tuples of futures.
43
-
/// ```rust
44
-
/// # futures::executor::block_on(async {
45
-
/// use futures_concurrency::prelude::*;
46
-
///
47
-
/// async fn some_async_fn() -> usize { 3 }
48
-
///
49
-
/// // the futures passed here are of different types
50
-
/// let fut1 = core::future::ready(1);
51
-
/// let fut2 = async { 2 };
52
-
/// let fut3 = some_async_fn();
53
-
/// // ^ NOTE: no `.await` here!
54
-
///
55
-
/// let outputs = (fut1, fut2, fut3).join().await;
56
-
/// assert_eq!(outputs, (1, 2, 3));
57
-
/// # })
58
-
/// ```
59
-
///
60
-
/// <br><br>
61
/// This function returns a new future which polls all futures concurrently.
62
fn join(self) -> Self::Future;
63
}
64
65
struct WakeStore<'scope> {
66
parent: Cell<Option<&'scope dyn Wake<'scope>>>,
67
-
ready: AtomicBool,
68
}
69
70
impl<'scope> WakeStore<'scope> {
···
75
}
76
}
77
fn take_ready(&self) -> bool {
78
-
self.ready.swap(false, Ordering::SeqCst)
79
}
80
}
81
82
impl<'scope> Wake<'scope> for WakeStore<'scope> {
83
fn wake(&self) {
84
-
self.ready.swap(true, Ordering::SeqCst);
85
if let Some(parent) = &self.parent.get() {
86
parent.wake();
87
}
···
107
}
108
109
#[allow(non_snake_case)]
110
pub struct $StructName<'scope, $($F: ScopedFuture<'scope>),+> {
111
$($F: MaybeDone<'scope, $F>,)*
112
wakers: $namespace::Wakers<'scope>,
···
190
impl_join_tuple!(join10 Join10 A B C D E F G H I J);
191
impl_join_tuple!(join11 Join11 A B C D E F G H I J K);
192
impl_join_tuple!(join12 Join12 A B C D E F G H I J K L);
···
1
use futures_core::{ScopedFuture, Wake};
2
use futures_util::{MaybeDone, MaybeDoneState, maybe_done};
3
+
use std::{cell::Cell, task::Poll};
4
5
+
/// from [futures-concurrency](https://github.com/yoshuawuyts/futures-concurrency/tree/main)
6
/// Wait for all futures to complete.
7
///
8
/// Awaits multiple futures simultaneously, returning the output of the futures
···
10
pub trait Join<'scope> {
11
/// The resulting output type.
12
type Output;
13
+
14
/// The [`ScopedFuture`] implementation returned by this method.
15
type Future: ScopedFuture<'scope, Output = Self::Output>;
16
+
17
/// Waits for multiple futures to complete.
18
///
19
/// Awaits multiple futures simultaneously, returning the output of the futures
20
/// in the same container type they we're created once all complete.
21
///
22
/// This function returns a new future which polls all futures concurrently.
23
fn join(self) -> Self::Future;
24
}
25
26
struct WakeStore<'scope> {
27
parent: Cell<Option<&'scope dyn Wake<'scope>>>,
28
+
ready: Cell<bool>,
29
}
30
31
impl<'scope> WakeStore<'scope> {
···
36
}
37
}
38
fn take_ready(&self) -> bool {
39
+
self.ready.replace(false)
40
}
41
}
42
43
impl<'scope> Wake<'scope> for WakeStore<'scope> {
44
fn wake(&self) {
45
+
self.ready.replace(true);
46
if let Some(parent) = &self.parent.get() {
47
parent.wake();
48
}
···
68
}
69
70
#[allow(non_snake_case)]
71
+
#[must_use = "futures do nothing unless you `.await` or poll them"]
72
pub struct $StructName<'scope, $($F: ScopedFuture<'scope>),+> {
73
$($F: MaybeDone<'scope, $F>,)*
74
wakers: $namespace::Wakers<'scope>,
···
152
impl_join_tuple!(join10 Join10 A B C D E F G H I J);
153
impl_join_tuple!(join11 Join11 A B C D E F G H I J K);
154
impl_join_tuple!(join12 Join12 A B C D E F G H I J K L);
155
+
156
+
#[cfg(test)]
157
+
mod tests {
158
+
use futures_util::poll_fn;
159
+
160
+
use super::*;
161
+
162
+
#[test]
163
+
fn basic() {
164
+
let f1 = poll_fn(|_| Poll::Ready(1));
165
+
let f2 = poll_fn(|_| Poll::Ready(2));
166
+
let dummy_waker = WakeStore::new();
167
+
assert_eq!((f1, f2).join().poll(&dummy_waker), Poll::Ready((1, 2)));
168
+
}
169
+
}
+2
futures-util/src/lib.rs
+2
futures-util/src/lib.rs
+1
futures-util/src/maybe_done.rs
+1
futures-util/src/maybe_done.rs
+55
futures-util/src/poll_fn.rs
+55
futures-util/src/poll_fn.rs
···
···
1
+
use core::fmt;
2
+
use std::task::Poll;
3
+
4
+
use futures_core::{ScopedFuture, Wake};
5
+
6
+
use crate::assert_future;
7
+
8
+
/// Future for the [`poll_fn`] function.
9
+
#[must_use = "futures do nothing unless you `.await` or poll them"]
10
+
pub struct PollFn<F> {
11
+
f: F,
12
+
}
13
+
14
+
/// Creates a new future wrapping around a function returning [`Poll`].
15
+
///
16
+
/// Polling the returned future delegates to the wrapped function.
17
+
///
18
+
/// # Examples
19
+
///
20
+
/// ```
21
+
/// # futures::executor::block_on(async {
22
+
/// use futures::future::poll_fn;
23
+
/// use futures::task::{Context, Poll};
24
+
///
25
+
/// fn read_line(_cx: &mut Context<'_>) -> Poll<String> {
26
+
/// Poll::Ready("Hello, World!".into())
27
+
/// }
28
+
///
29
+
/// let read_future = poll_fn(read_line);
30
+
/// assert_eq!(read_future.await, "Hello, World!".to_owned());
31
+
/// # });
32
+
/// ```
33
+
pub fn poll_fn<'scope, T, F>(f: F) -> PollFn<F>
34
+
where
35
+
F: Fn(&'scope dyn Wake) -> Poll<T>,
36
+
{
37
+
assert_future::<T, _>(PollFn { f })
38
+
}
39
+
40
+
impl<F> fmt::Debug for PollFn<F> {
41
+
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
42
+
f.debug_struct("PollFn").finish()
43
+
}
44
+
}
45
+
46
+
impl<'scope, T, F> ScopedFuture<'scope> for PollFn<F>
47
+
where
48
+
F: Fn(&'scope dyn Wake) -> Poll<T>,
49
+
{
50
+
type Output = T;
51
+
52
+
fn poll(&self, wake: &'scope dyn Wake) -> Poll<T> {
53
+
(&self.f)(wake)
54
+
}
55
+
}