+102
-3
Cargo.lock
+102
-3
Cargo.lock
···
39
"futures-combinators",
40
"futures-compat",
41
"futures-derive",
42
"futures-util",
43
]
44
45
[[package]]
46
name = "futures-combinators"
47
version = "0.0.2"
48
dependencies = [
49
-
"futures-util",
50
]
51
52
[[package]]
53
name = "futures-compat"
54
version = "0.0.2"
55
dependencies = [
56
-
"futures-core",
57
"lifetime-guard",
58
]
59
···
62
version = "0.0.2"
63
64
[[package]]
65
name = "futures-derive"
66
version = "0.0.2"
67
dependencies = [
···
71
]
72
73
[[package]]
74
name = "futures-util"
75
-
version = "0.0.2"
76
77
[[package]]
78
name = "generator"
···
169
version = "0.2.16"
170
source = "registry+https://github.com/rust-lang/crates.io-index"
171
checksum = "3b3cff922bd51709b605d9ead9aa71031d81447142d828eb4a6eba76fe619f9b"
172
173
[[package]]
174
name = "proc-macro2"
···
258
version = "1.3.0"
259
source = "registry+https://github.com/rust-lang/crates.io-index"
260
checksum = "0fda2ff0d084019ba4d7c6f371c95d8fd75ce3524c3cb8fb653a3023f6323e64"
261
262
[[package]]
263
name = "smallvec"
···
39
"futures-combinators",
40
"futures-compat",
41
"futures-derive",
42
+
]
43
+
44
+
[[package]]
45
+
name = "futures"
46
+
version = "0.3.31"
47
+
source = "registry+https://github.com/rust-lang/crates.io-index"
48
+
checksum = "65bc07b1a8bc7c85c5f2e110c476c7389b4554ba72af57d8445ea63a576b0876"
49
+
dependencies = [
50
+
"futures-channel",
51
+
"futures-core 0.3.31",
52
+
"futures-executor",
53
+
"futures-io",
54
+
"futures-sink",
55
+
"futures-task",
56
"futures-util",
57
]
58
59
[[package]]
60
+
name = "futures-channel"
61
+
version = "0.3.31"
62
+
source = "registry+https://github.com/rust-lang/crates.io-index"
63
+
checksum = "2dff15bf788c671c1934e366d07e30c1814a8ef514e1af724a602e8a2fbe1b10"
64
+
dependencies = [
65
+
"futures-core 0.3.31",
66
+
"futures-sink",
67
+
]
68
+
69
+
[[package]]
70
name = "futures-combinators"
71
version = "0.0.2"
72
dependencies = [
73
+
"futures 0.3.31",
74
+
"futures-compat",
75
+
"futures-core 0.0.2",
76
+
"lifetime-guard",
77
]
78
79
[[package]]
80
name = "futures-compat"
81
version = "0.0.2"
82
dependencies = [
83
+
"futures-core 0.0.2",
84
"lifetime-guard",
85
]
86
···
89
version = "0.0.2"
90
91
[[package]]
92
+
name = "futures-core"
93
+
version = "0.3.31"
94
+
source = "registry+https://github.com/rust-lang/crates.io-index"
95
+
checksum = "05f29059c0c2090612e8d742178b0580d2dc940c837851ad723096f87af6663e"
96
+
97
+
[[package]]
98
name = "futures-derive"
99
version = "0.0.2"
100
dependencies = [
···
104
]
105
106
[[package]]
107
+
name = "futures-executor"
108
+
version = "0.3.31"
109
+
source = "registry+https://github.com/rust-lang/crates.io-index"
110
+
checksum = "1e28d1d997f585e54aebc3f97d39e72338912123a67330d723fdbb564d646c9f"
111
+
dependencies = [
112
+
"futures-core 0.3.31",
113
+
"futures-task",
114
+
"futures-util",
115
+
]
116
+
117
+
[[package]]
118
+
name = "futures-io"
119
+
version = "0.3.31"
120
+
source = "registry+https://github.com/rust-lang/crates.io-index"
121
+
checksum = "9e5c1b78ca4aae1ac06c48a526a655760685149f0d465d21f37abfe57ce075c6"
122
+
123
+
[[package]]
124
+
name = "futures-macro"
125
+
version = "0.3.31"
126
+
source = "registry+https://github.com/rust-lang/crates.io-index"
127
+
checksum = "162ee34ebcb7c64a8abebc059ce0fee27c2262618d7b60ed8faf72fef13c3650"
128
+
dependencies = [
129
+
"proc-macro2",
130
+
"quote",
131
+
"syn",
132
+
]
133
+
134
+
[[package]]
135
+
name = "futures-sink"
136
+
version = "0.3.31"
137
+
source = "registry+https://github.com/rust-lang/crates.io-index"
138
+
checksum = "e575fab7d1e0dcb8d0c7bcf9a63ee213816ab51902e6d244a95819acacf1d4f7"
139
+
140
+
[[package]]
141
+
name = "futures-task"
142
+
version = "0.3.31"
143
+
source = "registry+https://github.com/rust-lang/crates.io-index"
144
+
checksum = "f90f7dce0722e95104fcb095585910c0977252f286e354b5e3bd38902cd99988"
145
+
146
+
[[package]]
147
name = "futures-util"
148
+
version = "0.3.31"
149
+
source = "registry+https://github.com/rust-lang/crates.io-index"
150
+
checksum = "9fa08315bb612088cc391249efdc3bc77536f16c91f6cf495e6fbe85b20a4a81"
151
+
dependencies = [
152
+
"futures-channel",
153
+
"futures-core 0.3.31",
154
+
"futures-io",
155
+
"futures-macro",
156
+
"futures-sink",
157
+
"futures-task",
158
+
"memchr",
159
+
"pin-project-lite",
160
+
"pin-utils",
161
+
"slab",
162
+
]
163
164
[[package]]
165
name = "generator"
···
256
version = "0.2.16"
257
source = "registry+https://github.com/rust-lang/crates.io-index"
258
checksum = "3b3cff922bd51709b605d9ead9aa71031d81447142d828eb4a6eba76fe619f9b"
259
+
260
+
[[package]]
261
+
name = "pin-utils"
262
+
version = "0.1.0"
263
+
source = "registry+https://github.com/rust-lang/crates.io-index"
264
+
checksum = "8b870d8c151b6f2fb93e84a13146138f05d02ed11c7e7c54f8826aaaf7c9f184"
265
266
[[package]]
267
name = "proc-macro2"
···
351
version = "1.3.0"
352
source = "registry+https://github.com/rust-lang/crates.io-index"
353
checksum = "0fda2ff0d084019ba4d7c6f371c95d8fd75ce3524c3cb8fb653a3023f6323e64"
354
+
355
+
[[package]]
356
+
name = "slab"
357
+
version = "0.4.11"
358
+
source = "registry+https://github.com/rust-lang/crates.io-index"
359
+
checksum = "7a2ae44ef20feb57a68b23d846850f861394c2e02dc425a50098ae8c90267589"
360
361
[[package]]
362
name = "smallvec"
+4
-3
Cargo.toml
+4
-3
Cargo.toml
···
1
[workspace]
2
resolver = "3"
3
-
members = [ "futures", "futures-combinators", "futures-compat", "futures-core", "futures-derive", "futures-util", "lifetime-guard"]
4
5
[workspace.package]
6
version = "0.0.2"
···
12
homepage = "https://github.com/AngleSideAngle/bcsc"
13
14
[workspace.dependencies]
15
-
futures = { path = "futures", version = "0.0.2" }
16
futures-combinators = { path = "futures-combinators", version = "0.0.2" }
17
futures-compat = { path = "futures-compat", version = "0.0.2" }
18
futures-core = { path = "futures-core", version = "0.0.2" }
19
futures-derive = { path = "futures-derive", version = "0.0.2" }
20
-
futures-util = { path = "futures-util", version = "0.0.2" }
21
lifetime-guard = { path = "lifetime-guard", version = "0.0.2" }
···
1
[workspace]
2
resolver = "3"
3
+
members = [ "futures", "futures-combinators", "futures-compat", "futures-core", "futures-derive", "lifetime-guard"]
4
5
[workspace.package]
6
version = "0.0.2"
···
12
homepage = "https://github.com/AngleSideAngle/bcsc"
13
14
[workspace.dependencies]
15
+
# futures = { path = "futures", version = "0.0.2" }
16
futures-combinators = { path = "futures-combinators", version = "0.0.2" }
17
futures-compat = { path = "futures-compat", version = "0.0.2" }
18
futures-core = { path = "futures-core", version = "0.0.2" }
19
futures-derive = { path = "futures-derive", version = "0.0.2" }
20
lifetime-guard = { path = "lifetime-guard", version = "0.0.2" }
21
+
# TODO probably vendor in
22
+
futures = "0.3"
+4
-1
futures-combinators/Cargo.toml
+4
-1
futures-combinators/Cargo.toml
+106
-88
futures-combinators/src/join.rs
+106
-88
futures-combinators/src/join.rs
···
1
-
use futures_comp
2
-
use futures_util::WakeStore;
3
-
use futures_util::{MaybeDone, maybe_done};
4
-
use std::{cell::Cell, task::Poll};
5
6
/// from [futures-concurrency](https://github.com/yoshuawuyts/futures-concurrency/tree/main)
7
/// Wait for all futures to complete.
···
13
type Output;
14
15
/// The [`ScopedFuture`] implementation returned by this method.
16
-
type Future: Future<Output = Self::Output>;
17
18
/// Waits for multiple futures to complete.
19
///
···
24
fn join(self) -> Self::Future;
25
}
26
27
-
pub trait JoinExt<'scope> {
28
-
fn along_with<Fut>(self, other: Fut) -> Join2<'scope, Self, Fut>
29
where
30
-
Self: Sized + Future<'scope>,
31
-
Fut: Future<'scope>,
32
{
33
(self, other).join()
34
}
35
}
36
37
-
impl<'scope, T> JoinExt<'scope> for T where T: Future<'scope> {}
38
39
macro_rules! impl_join_tuple {
40
($namespace:ident $StructName:ident $($F:ident)+) => {
41
mod $namespace {
42
-
use super::*;
43
-
44
-
#[allow(non_snake_case)]
45
-
pub struct Wakers<'scope> {
46
-
$(pub $F: WakeStore<'scope>,)*
47
-
}
48
-
49
-
#[allow(non_snake_case)]
50
-
pub struct WakerRefs<'scope> {
51
-
$(pub $F: Cell<Option<&'scope dyn Wake<'scope>>>,)*
52
-
}
53
}
54
55
#[allow(non_snake_case)]
56
#[must_use = "futures do nothing unless you `.await` or poll them"]
57
-
pub struct $StructName<'scope, $($F: ScopedFuture<'scope>),+> {
58
-
$($F: MaybeDone<'scope, $F>,)*
59
-
wakers: $namespace::Wakers<'scope>,
60
-
refs: $namespace::WakerRefs<'scope>,
61
}
62
63
-
impl<'scope, $($F: ScopedFuture<'scope>),+> ScopedFuture<'scope>
64
-
for $StructName<'scope, $($F),+>
65
{
66
type Output = ($($F::Output),+);
67
68
-
fn poll(&'scope self, wake: &'scope dyn Wake<'scope>) -> Poll<Self::Output> {
69
let mut ready = true;
70
71
$(
72
-
self.wakers.$F.set_parent(wake);
73
-
self.refs.$F.replace(Some(&self.wakers.$F));
74
75
-
if !self.$F.is_done() {
76
-
ready &= if self.wakers.$F.take_ready() {
77
-
// By polling the future, we create our self-referential structure for lifetime `'scope`.
78
-
//
79
-
// SAFETY:
80
-
// `unwrap_unchecked` is safe because we just inserted `Some` into `refs.$F`,
81
-
// so it is guaranteed to be `Some`.
82
-
self.$F
83
-
.poll(unsafe { (&self.refs.$F.get()).unwrap_unchecked() })
84
-
.is_ready()
85
-
} else {
86
-
false
87
-
};
88
-
}
89
)+
90
91
if ready {
···
96
// Once a future is not `MaybeDoneState::Future`, it transitions to `Done`,
97
// so we know the result of `take_output` must be `Some`.
98
unsafe {
99
-
self.$F.take_output().unwrap_unchecked()
100
},
101
)*
102
))
···
106
}
107
}
108
109
-
impl<'scope, $($F: ScopedFuture<'scope>),+> Join<'scope> for ($($F),+) {
110
type Output = ($($F::Output),*);
111
-
type Future = $StructName<'scope, $($F),+>;
112
113
#[allow(non_snake_case)]
114
fn join(self) -> Self::Future {
115
let ($($F),+) = self;
116
117
$StructName {
118
-
$($F: maybe_done($F),)*
119
-
wakers: $namespace::Wakers {
120
-
$($F: WakeStore::new(),)*
121
-
},
122
-
refs: $namespace::WakerRefs {
123
-
$($F: Option::None.into(),)*
124
-
},
125
}
126
}
127
}
···
144
mod tests {
145
#![no_std]
146
147
-
use futures_util::{noop_wake, poll_fn};
148
149
use super::*;
150
151
#[test]
152
fn counters() {
153
-
let x1 = Cell::new(0);
154
-
let x2 = Cell::new(0);
155
-
let f1 = poll_fn(|wake| {
156
-
wake.register();
157
-
x1.set(x1.get() + 1);
158
-
if x1.get() == 4 {
159
-
Poll::Ready(x1.get())
160
} else {
161
Poll::Pending
162
}
163
});
164
-
let f2 = poll_fn(|wake| {
165
-
wake.register();
166
-
x2.set(x2.get() + 1);
167
-
if x2.get() == 5 {
168
-
Poll::Ready(x2.get())
169
} else {
170
Poll::Pending
171
}
172
});
173
-
let dummy_waker = noop_wake();
174
-
let join = (f1, f2).join();
175
for _ in 0..4 {
176
-
assert_eq!(join.poll(&dummy_waker), Poll::Pending);
177
}
178
-
assert_eq!(join.poll(&dummy_waker), Poll::Ready((4, 5)));
179
}
180
181
-
#[test]
182
-
fn never_wake() {
183
-
let f1 = poll_fn(|_| Poll::<i32>::Pending);
184
-
let f2 = poll_fn(|_| Poll::<i32>::Pending);
185
-
let dummy_waker = noop_wake();
186
-
let join = (f1, f2).join();
187
-
for _ in 0..10 {
188
-
assert_eq!(join.poll(&dummy_waker), Poll::Pending);
189
-
}
190
-
}
191
192
-
#[test]
193
-
fn basic() {
194
-
let f1 = poll_fn(|_| Poll::Ready(1));
195
-
let f2 = poll_fn(|_| Poll::Ready(2));
196
-
let dummy_waker = noop_wake();
197
-
assert_eq!(f1.along_with(f2).poll(&dummy_waker), Poll::Ready((1, 2)));
198
-
}
199
}
···
1
+
use crate::wake::WakeArray;
2
+
use futures::future::FusedFuture;
3
+
use futures::future::MaybeDone;
4
+
use futures::future::maybe_done;
5
+
use futures_compat::BespokeFutureWrapper;
6
+
use std::pin::Pin;
7
+
use std::task::Context;
8
+
use std::task::Poll;
9
10
/// from [futures-concurrency](https://github.com/yoshuawuyts/futures-concurrency/tree/main)
11
/// Wait for all futures to complete.
···
17
type Output;
18
19
/// The [`ScopedFuture`] implementation returned by this method.
20
+
type Future: futures_core::Future<Output = Self::Output>;
21
22
/// Waits for multiple futures to complete.
23
///
···
28
fn join(self) -> Self::Future;
29
}
30
31
+
pub trait JoinExt {
32
+
fn along_with<Fut>(self, other: Fut) -> Join2<Self, Fut>
33
where
34
+
Self: Sized + futures_core::Future,
35
+
Fut: futures_core::Future,
36
{
37
(self, other).join()
38
}
39
}
40
41
+
impl<T> JoinExt for T where T: futures_core::Future {}
42
43
macro_rules! impl_join_tuple {
44
($namespace:ident $StructName:ident $($F:ident)+) => {
45
mod $namespace {
46
+
#[repr(u8)]
47
+
pub(super) enum Indexes { $($F,)+ }
48
+
pub(super) const LEN: usize = [$(Indexes::$F,)+].len();
49
}
50
51
#[allow(non_snake_case)]
52
#[must_use = "futures do nothing unless you `.await` or poll them"]
53
+
pub struct $StructName<$($F: futures_core::Future),+> {
54
+
$($F: MaybeDone<BespokeFutureWrapper<$F>>,)*
55
+
wake_array: WakeArray<{$namespace::LEN}>,
56
}
57
58
+
impl<$($F: futures_core::Future),+> futures_core::Future for $StructName<$($F),+>
59
{
60
type Output = ($($F::Output),+);
61
62
+
#[allow(non_snake_case)]
63
+
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
64
+
let this = unsafe { self.get_unchecked_mut() };
65
+
66
+
let wake_array = unsafe { Pin::new_unchecked(&this.wake_array) };
67
+
$(
68
+
// TODO debug_assert_matches is nightly https://github.com/rust-lang/rust/issues/82775
69
+
debug_assert!(!matches!(this.$F, MaybeDone::Gone), "do not poll futures after they return Poll::Ready");
70
+
let mut $F = unsafe { Pin::new_unchecked(&mut this.$F) };
71
+
)+
72
+
73
+
// extract reference to ValueGuard from Context
74
+
// this is safe because futures_core::Future are isolated
75
+
// from core::future::Future impls and guaranteed to have
76
+
// their cx.wakers represented in the nonstandard format
77
+
unsafe { wake_array.register_parent(futures_compat::waker_to_guard(cx.waker())) }
78
+
79
let mut ready = true;
80
81
$(
82
+
let index = $namespace::Indexes::$F as usize;
83
+
// cx to feed children
84
+
let waker = unsafe { futures_compat::guard_to_waker(wake_array.child_guard_ptr(index).unwrap_unchecked()) };
85
+
let mut child_cx = Context::from_waker(&waker);
86
87
+
// ready if MaybeDone is Done or just completed (converted to Done)
88
+
// unsafe / against Future api contract to poll after Gone/Future is finished
89
+
ready &= if unsafe { wake_array.take_woken(index).unwrap_unchecked() } {
90
+
$F.as_mut().poll(&mut child_cx).is_ready()
91
+
} else {
92
+
$F.is_terminated()
93
+
};
94
)+
95
96
if ready {
···
101
// Once a future is not `MaybeDoneState::Future`, it transitions to `Done`,
102
// so we know the result of `take_output` must be `Some`.
103
unsafe {
104
+
$F.take_output().unwrap_unchecked()
105
},
106
)*
107
))
···
111
}
112
}
113
114
+
impl<$($F: futures_core::Future),+> Join for ($($F),+) {
115
type Output = ($($F::Output),*);
116
+
type Future = $StructName<$($F),+>;
117
118
#[allow(non_snake_case)]
119
fn join(self) -> Self::Future {
120
let ($($F),+) = self;
121
122
$StructName {
123
+
$($F: maybe_done(unsafe { futures_compat::bespoke_future_to_std($F) }),)*
124
+
wake_array: WakeArray::new(),
125
}
126
}
127
}
···
144
mod tests {
145
#![no_std]
146
147
+
use futures_core::{Future, Wake};
148
+
use lifetime_guard::guard::ValueGuard;
149
+
150
+
use crate::wake::{DummyWaker, wake_bespoke_waker};
151
152
use super::*;
153
+
use std::cell::Cell;
154
+
use std::future::poll_fn;
155
+
use std::pin;
156
+
use std::ptr::NonNull;
157
158
#[test]
159
fn counters() {
160
+
let mut x1 = 0;
161
+
let mut x2 = 0;
162
+
let f1 = poll_fn(|cx| {
163
+
unsafe { wake_bespoke_waker(cx.waker()) };
164
+
x1 += 1;
165
+
if x1 == 4 {
166
+
Poll::Ready(x1)
167
} else {
168
Poll::Pending
169
}
170
});
171
+
let f2 = poll_fn(|cx| {
172
+
unsafe { wake_bespoke_waker(cx.waker()) };
173
+
x2 += 1;
174
+
if x2 == 5 {
175
+
Poll::Ready(x2)
176
} else {
177
Poll::Pending
178
}
179
});
180
+
let guard = pin::pin!(ValueGuard::new(NonNull::new(
181
+
&mut DummyWaker as *mut dyn Wake,
182
+
)));
183
+
let waker = unsafe { futures_compat::guard_to_waker(guard.as_ref()) };
184
+
let mut cx = Context::from_waker(&waker);
185
+
let mut join = unsafe {
186
+
(
187
+
futures_compat::std_future_to_bespoke(f1),
188
+
futures_compat::std_future_to_bespoke(f2),
189
+
)
190
+
}
191
+
.join();
192
+
let mut pinned = unsafe { Pin::new_unchecked(&mut join) };
193
for _ in 0..4 {
194
+
assert_eq!(pinned.as_mut().poll(&mut cx), Poll::Pending);
195
}
196
+
assert_eq!(pinned.poll(&mut cx), Poll::Ready((4, 5)));
197
}
198
199
+
// #[test]
200
+
// fn never_wake() {
201
+
// let f1 = poll_fn(|_| Poll::<i32>::Pending);
202
+
// let f2 = poll_fn(|_| Poll::<i32>::Pending);
203
+
// let dummy_waker = noop_wake();
204
+
// let join = (f1, f2).join();
205
+
// for _ in 0..10 {
206
+
// assert_eq!(join.poll(&dummy_waker), Poll::Pending);
207
+
// }
208
+
// }
209
210
+
// #[test]
211
+
// fn basic() {
212
+
// let f1 = poll_fn(|_| Poll::Ready(1));
213
+
// let f2 = poll_fn(|_| Poll::Ready(2));
214
+
// let dummy_waker = noop_wake();
215
+
// assert_eq!(f1.along_with(f2).poll(&dummy_waker), Poll::Ready((1, 2)));
216
+
// }
217
}
+4
-3
futures-combinators/src/lib.rs
+4
-3
futures-combinators/src/lib.rs
+113
futures-combinators/src/wake.rs
+113
futures-combinators/src/wake.rs
···
···
1
+
use std::{
2
+
array, cell::Cell, marker::PhantomPinned, pin::Pin, ptr::NonNull,
3
+
task::Context,
4
+
};
5
+
6
+
use futures_compat::WakePtr;
7
+
use futures_core::Wake;
8
+
use lifetime_guard::{guard::RefGuard, guard::ValueGuard};
9
+
10
+
pub struct WakeArray<const N: usize> {
11
+
parent: RefGuard<WakePtr>,
12
+
children: [ValueGuard<WakePtr>; N],
13
+
stores: [WakeStore; N],
14
+
marker: PhantomPinned,
15
+
}
16
+
17
+
impl<const N: usize> WakeArray<N> {
18
+
pub fn new() -> Self {
19
+
Self {
20
+
parent: RefGuard::new(),
21
+
children: array::from_fn(|_| ValueGuard::new(None)),
22
+
stores: array::from_fn(|_| WakeStore::new()),
23
+
marker: PhantomPinned,
24
+
}
25
+
}
26
+
27
+
pub fn register_parent(
28
+
self: Pin<&Self>,
29
+
parent: Pin<&ValueGuard<WakePtr>>,
30
+
) {
31
+
unsafe { Pin::new_unchecked(&self.parent) }.register(parent);
32
+
}
33
+
34
+
/// Returns pinned reference to child ValueGuard
35
+
/// returns None if n is not in 0..N
36
+
pub fn child_guard_ptr(
37
+
self: Pin<&Self>,
38
+
index: usize,
39
+
) -> Option<Pin<&ValueGuard<WakePtr>>> {
40
+
// TODO remove bounds checking, break api when https://github.com/rust-lang/rust/issues/123646
41
+
if index >= N {
42
+
return None;
43
+
}
44
+
45
+
let wake_store = unsafe {
46
+
NonNull::new_unchecked(self.stores.get(index).unwrap_unchecked()
47
+
as *const dyn Wake
48
+
as *mut dyn Wake)
49
+
};
50
+
let child_guard =
51
+
unsafe { self.get_ref().children.get(index).unwrap_unchecked() };
52
+
53
+
child_guard.set(Some(wake_store));
54
+
55
+
Some(unsafe { Pin::new_unchecked(child_guard) })
56
+
}
57
+
58
+
pub fn take_woken(self: Pin<&Self>, index: usize) -> Option<bool> {
59
+
self.stores.get(index).map(|store| store.take_woken())
60
+
}
61
+
}
62
+
63
+
pub struct WakeStore {
64
+
wake_parent: Cell<Option<NonNull<RefGuard<WakePtr>>>>,
65
+
activated: Cell<bool>,
66
+
}
67
+
68
+
impl WakeStore {
69
+
pub fn new() -> Self {
70
+
Self {
71
+
wake_parent: Cell::new(None),
72
+
activated: Cell::new(false),
73
+
}
74
+
}
75
+
76
+
pub fn set_parent(&self, parent: &RefGuard<WakePtr>) {
77
+
self.wake_parent.set(Some(parent.into()));
78
+
}
79
+
80
+
pub fn take_woken(&self) -> bool {
81
+
self.activated.replace(false)
82
+
}
83
+
}
84
+
85
+
impl Wake for WakeStore {
86
+
fn wake(&self) {
87
+
self.activated.set(true);
88
+
if let Some(parent) = self
89
+
.wake_parent
90
+
.get()
91
+
.map(|guard_ptr| unsafe { &*guard_ptr.as_ptr() })
92
+
.and_then(|guard| guard.get())
93
+
.flatten()
94
+
{
95
+
unsafe { &*parent.as_ptr() }.wake();
96
+
}
97
+
}
98
+
}
99
+
100
+
pub unsafe fn wake_bespoke_waker(waker: &std::task::Waker) {
101
+
unsafe {
102
+
let guard = futures_compat::waker_to_guard(waker);
103
+
if let Some(wake) = guard.get() {
104
+
(*wake.as_ptr()).wake();
105
+
}
106
+
}
107
+
}
108
+
109
+
pub struct DummyWaker;
110
+
111
+
impl Wake for DummyWaker {
112
+
fn wake(&self) {}
113
+
}
+43
-21
futures-compat/src/lib.rs
+43
-21
futures-compat/src/lib.rs
···
3
4
use std::{
5
pin::Pin,
6
task::{Context, Poll, RawWaker, RawWakerVTable, Waker},
7
};
8
9
-
use lifetime_guard::atomic_guard::AtomicValueGuard;
10
11
static EVIL_VTABLE: RawWakerVTable = RawWakerVTable::new(
12
|_| panic!("wtf"),
···
15
|_| panic!("wtf"),
16
);
17
18
-
/// Coerces a pinned `AtomicValueGuard` reference to a `Waker` for use in
19
/// `core::future::Future`
20
///
21
/// Any usage or storage of the resulting `Waker` is undefined behavior.
22
-
pub unsafe fn guard_to_waker(guard: Pin<&AtomicValueGuard<fn()>>) -> Waker {
23
unsafe {
24
Waker::from_raw(RawWaker::new(
25
-
guard.get_ref() as *const AtomicValueGuard<fn()> as *const (),
26
&EVIL_VTABLE,
27
))
28
}
···
32
///
33
/// This should only be used to undo the work of `guard_to_waker`.
34
pub unsafe fn waker_to_guard<'a>(
35
-
waker: Waker,
36
-
) -> Pin<&'a AtomicValueGuard<fn()>> {
37
unsafe {
38
-
Pin::new_unchecked(&*(waker.data() as *const AtomicValueGuard<fn()>))
39
}
40
}
41
42
/// wraps `core::future::Future` in impl of `bcsc:Future`
43
#[repr(transparent)]
···
51
}
52
}
53
54
-
impl<F: core::future::Future> NormalFutureWrapper<F> {
55
-
pub unsafe fn from_std_future(future: F) -> Self {
56
-
Self(future)
57
-
}
58
-
}
59
-
60
/// wraps custom `bcsc::Future` in impl of `core::future::Future`
61
#[repr(transparent)]
62
-
pub struct CustomFutureWrapper<F: futures_core::Future>(F);
63
64
-
impl<F: futures_core::Future> core::future::Future for CustomFutureWrapper<F> {
65
type Output = F::Output;
66
67
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
68
unsafe { self.map_unchecked_mut(|this| &mut this.0).poll(cx) }
69
}
70
}
71
-
72
-
impl<F: futures_core::Future> CustomFutureWrapper<F> {
73
-
pub unsafe fn from_custom_future(future: F) -> Self {
74
-
Self(future)
75
-
}
76
-
}
···
3
4
use std::{
5
pin::Pin,
6
+
ptr::NonNull,
7
task::{Context, Poll, RawWaker, RawWakerVTable, Waker},
8
};
9
10
+
use futures_core::Wake;
11
+
use lifetime_guard::{atomic_guard::AtomicValueGuard, guard::ValueGuard};
12
13
static EVIL_VTABLE: RawWakerVTable = RawWakerVTable::new(
14
|_| panic!("wtf"),
···
17
|_| panic!("wtf"),
18
);
19
20
+
pub type WakePtr = Option<NonNull<dyn Wake>>;
21
+
22
+
/// Coerces a pinned `ValueGuard` reference to a `Waker` for use in
23
/// `core::future::Future`
24
///
25
/// Any usage or storage of the resulting `Waker` is undefined behavior.
26
+
pub unsafe fn guard_to_waker(guard: Pin<&ValueGuard<WakePtr>>) -> Waker {
27
unsafe {
28
Waker::from_raw(RawWaker::new(
29
+
guard.get_ref() as *const ValueGuard<WakePtr> as *const (),
30
+
&EVIL_VTABLE,
31
+
))
32
+
}
33
+
}
34
+
pub unsafe fn atomic_guard_to_waker(
35
+
guard: Pin<&AtomicValueGuard<WakePtr>>,
36
+
) -> Waker {
37
+
unsafe {
38
+
Waker::from_raw(RawWaker::new(
39
+
guard.get_ref() as *const AtomicValueGuard<WakePtr> as *const (),
40
&EVIL_VTABLE,
41
))
42
}
···
46
///
47
/// This should only be used to undo the work of `guard_to_waker`.
48
pub unsafe fn waker_to_guard<'a>(
49
+
waker: &Waker,
50
+
) -> Pin<&'a ValueGuard<WakePtr>> {
51
unsafe {
52
+
Pin::new_unchecked(&*(waker.data() as *const ValueGuard<WakePtr>))
53
}
54
}
55
+
pub unsafe fn waker_to_atomic_guard<'a>(
56
+
waker: &Waker,
57
+
) -> Pin<&'a AtomicValueGuard<WakePtr>> {
58
+
unsafe {
59
+
Pin::new_unchecked(&*(waker.data() as *const AtomicValueGuard<WakePtr>))
60
+
}
61
+
}
62
+
63
+
// TODO should probably return impl futures_core::Future, same for next fn
64
+
pub unsafe fn std_future_to_bespoke<F: core::future::Future>(
65
+
future: F,
66
+
) -> NormalFutureWrapper<F> {
67
+
NormalFutureWrapper(future)
68
+
}
69
+
70
+
pub unsafe fn bespoke_future_to_std<F: futures_core::Future>(
71
+
future: F,
72
+
) -> BespokeFutureWrapper<F> {
73
+
BespokeFutureWrapper(future)
74
+
}
75
76
/// wraps `core::future::Future` in impl of `bcsc:Future`
77
#[repr(transparent)]
···
85
}
86
}
87
88
/// wraps custom `bcsc::Future` in impl of `core::future::Future`
89
#[repr(transparent)]
90
+
pub struct BespokeFutureWrapper<F: futures_core::Future>(F);
91
92
+
impl<F: futures_core::Future> core::future::Future for BespokeFutureWrapper<F> {
93
type Output = F::Output;
94
95
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
96
unsafe { self.map_unchecked_mut(|this| &mut this.0).poll(cx) }
97
}
98
}
+5
futures-core/src/lib.rs
+5
futures-core/src/lib.rs
-11
futures-util/Cargo.toml
-11
futures-util/Cargo.toml
-14
futures-util/src/ext.rs
-14
futures-util/src/ext.rs
-18
futures-util/src/lib.rs
-18
futures-util/src/lib.rs
···
1
-
mod ext;
2
-
mod maybe_done;
3
-
mod poll_fn;
4
-
mod wakers;
5
-
6
-
use futures_core::ScopedFuture;
7
-
pub use maybe_done::*;
8
-
pub use poll_fn::poll_fn;
9
-
pub use wakers::*;
10
-
11
-
// Just a helper function to ensure the futures we're returning all have the
12
-
// right implementations.
13
-
pub(crate) fn assert_future<'scope, T, F>(future: F) -> F
14
-
where
15
-
F: ScopedFuture<'scope, Output = T>,
16
-
{
17
-
future
18
-
}
···
-147
futures-util/src/maybe_done.rs
-147
futures-util/src/maybe_done.rs
···
1
-
//! Definition of the MaybeDone combinator
2
-
3
-
use futures_core::{ScopedFuture, Wake};
4
-
5
-
use super::assert_future;
6
-
use std::{
7
-
cell::UnsafeCell,
8
-
task::{Poll, ready},
9
-
};
10
-
11
-
#[must_use = "futures do nothing unless you `.await` or poll them"]
12
-
pub struct MaybeDone<'scope, Fut: ScopedFuture<'scope>> {
13
-
state: UnsafeCell<MaybeDoneState<'scope, Fut>>,
14
-
}
15
-
16
-
/// A future that may have completed.
17
-
///
18
-
/// This is created by the [`maybe_done()`] function.
19
-
#[derive(Debug)]
20
-
pub enum MaybeDoneState<'scope, Fut: ScopedFuture<'scope>> {
21
-
/// A not-yet-completed future
22
-
Future(Fut),
23
-
/// The output of the completed future
24
-
Done(Fut::Output),
25
-
/// The empty variant after the result of a [`MaybeDone`] has been
26
-
/// taken using the [`take_output`](MaybeDone::take_output) method.
27
-
Gone,
28
-
}
29
-
30
-
/// Wraps a future into a `MaybeDone`
31
-
///
32
-
///
33
-
pub fn maybe_done<'scope, Fut: ScopedFuture<'scope>>(
34
-
future: Fut,
35
-
) -> MaybeDone<'scope, Fut> {
36
-
assert_future::<(), _>(MaybeDone {
37
-
state: MaybeDoneState::Future(future).into(),
38
-
})
39
-
}
40
-
41
-
impl<'scope, Fut: ScopedFuture<'scope>> MaybeDone<'scope, Fut> {
42
-
/// Attempt to take the output of a `MaybeDone` without driving it
43
-
/// towards completion.
44
-
#[inline]
45
-
pub fn take_output(&self) -> Option<Fut::Output> {
46
-
match unsafe { &*self.state.get() } {
47
-
MaybeDoneState::Done(_) => {}
48
-
MaybeDoneState::Future(_) | MaybeDoneState::Gone => return None,
49
-
}
50
-
match unsafe { self.state.get().replace(MaybeDoneState::Gone) } {
51
-
MaybeDoneState::Done(output) => Some(output),
52
-
_ => unreachable!(),
53
-
}
54
-
}
55
-
56
-
/// Returns an immutable reference to the internal state of this future
57
-
///
58
-
/// # Safety
59
-
/// You must not hold this reference past any use of any other methods of this struct
60
-
pub unsafe fn get_state(&self) -> &MaybeDoneState<'scope, Fut> {
61
-
unsafe { &*self.state.get() }
62
-
}
63
-
64
-
pub fn is_done(&self) -> bool {
65
-
match unsafe { &*self.state.get() } {
66
-
MaybeDoneState::Future(_) => false,
67
-
MaybeDoneState::Done(_) | MaybeDoneState::Gone => true,
68
-
}
69
-
}
70
-
}
71
-
72
-
impl<'scope, Fut: ScopedFuture<'scope>> ScopedFuture<'scope>
73
-
for MaybeDone<'scope, Fut>
74
-
{
75
-
type Output = ();
76
-
77
-
fn poll(&'scope self, cx: &'scope dyn Wake<'scope>) -> Poll<Self::Output> {
78
-
match unsafe { &*self.state.get() } {
79
-
MaybeDoneState::Future(f) => {
80
-
let res = ready!(f.poll(cx));
81
-
// this is fine because no immutable references currently exist
82
-
unsafe { self.state.get().replace(MaybeDoneState::Done(res)) };
83
-
}
84
-
MaybeDoneState::Done(_) => {}
85
-
MaybeDoneState::Gone => {
86
-
panic!("MaybeDone polled after value taken")
87
-
}
88
-
}
89
-
Poll::Ready(())
90
-
}
91
-
}
92
-
93
-
#[cfg(test)]
94
-
mod tests {
95
-
use std::cell::Cell;
96
-
97
-
use crate::poll_fn;
98
-
99
-
use crate::noop_wake;
100
-
101
-
use super::*;
102
-
103
-
#[test]
104
-
fn immediate_return() {
105
-
let immediate = poll_fn(|_| Poll::Ready(1));
106
-
let future = maybe_done(immediate);
107
-
let wake = noop_wake();
108
-
match unsafe { future.get_state() } {
109
-
MaybeDoneState::Future(_) => {}
110
-
MaybeDoneState::Done(_) | MaybeDoneState::Gone => {
111
-
panic!("should be MaybeDoneState::Future")
112
-
}
113
-
}
114
-
assert_eq!(future.poll(&wake), Poll::Ready(()));
115
-
match unsafe { future.get_state() } {
116
-
MaybeDoneState::Done(_) => {}
117
-
MaybeDoneState::Future(_) | MaybeDoneState::Gone => {
118
-
panic!("should be MaybeDoneState::Done")
119
-
}
120
-
}
121
-
assert_eq!(future.take_output(), Some(1));
122
-
assert_eq!(future.take_output(), None);
123
-
}
124
-
125
-
#[test]
126
-
fn normal() {
127
-
let x = Cell::new(0);
128
-
let poll = poll_fn(|wake| {
129
-
wake.wake();
130
-
x.set(x.get() + 1);
131
-
if x.get() == 4 {
132
-
Poll::Ready(x.get())
133
-
} else {
134
-
Poll::Pending
135
-
}
136
-
});
137
-
let future = maybe_done(poll);
138
-
let noop = noop_wake();
139
-
for _ in 0..3 {
140
-
assert_eq!(future.poll(&noop), Poll::Pending);
141
-
assert_eq!(future.take_output(), None);
142
-
}
143
-
assert_eq!(future.poll(&noop), Poll::Ready(()));
144
-
assert_eq!(future.poll(&noop), Poll::Ready(()));
145
-
assert_eq!(future.take_output(), Some(4));
146
-
}
147
-
}
···
-39
futures-util/src/poll_fn.rs
-39
futures-util/src/poll_fn.rs
···
1
-
use core::fmt;
2
-
use std::task::Poll;
3
-
4
-
use futures_core::{ScopedFuture, Wake};
5
-
6
-
use crate::assert_future;
7
-
8
-
/// Future for the [`poll_fn`] function.
9
-
#[must_use = "futures do nothing unless you `.await` or poll them"]
10
-
pub struct PollFn<F> {
11
-
f: F,
12
-
}
13
-
14
-
/// Creates a new future wrapping around a function returning [`Poll`].
15
-
///
16
-
/// Polling the returned future delegates to the wrapped function.
17
-
pub fn poll_fn<'scope, T, F>(f: F) -> PollFn<F>
18
-
where
19
-
F: Fn(&'scope dyn Wake) -> Poll<T>,
20
-
{
21
-
assert_future::<T, _>(PollFn { f })
22
-
}
23
-
24
-
impl<F> fmt::Debug for PollFn<F> {
25
-
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
26
-
f.debug_struct("PollFn").finish()
27
-
}
28
-
}
29
-
30
-
impl<'scope, T, F> ScopedFuture<'scope> for PollFn<F>
31
-
where
32
-
F: Fn(&'scope dyn Wake) -> Poll<T>,
33
-
{
34
-
type Output = T;
35
-
36
-
fn poll(&self, wake: &'scope dyn Wake) -> Poll<T> {
37
-
(&self.f)(wake)
38
-
}
39
-
}
···
-52
futures-util/src/wakers.rs
-52
futures-util/src/wakers.rs
···
1
-
use std::cell::Cell;
2
-
3
-
use futures_core::Wake;
4
-
5
-
pub struct WakeStore<'scope> {
6
-
parent: Cell<Option<&'scope dyn Wake<'scope>>>,
7
-
ready: Cell<bool>,
8
-
}
9
-
10
-
impl<'scope> WakeStore<'scope> {
11
-
pub fn new() -> Self {
12
-
Self {
13
-
parent: Option::None.into(),
14
-
ready: true.into(),
15
-
}
16
-
}
17
-
18
-
pub fn set_parent(&self, parent: &'scope dyn Wake<'scope>) {
19
-
self.parent.replace(Some(parent));
20
-
}
21
-
22
-
pub fn take_ready(&self) -> bool {
23
-
self.ready.replace(false)
24
-
}
25
-
}
26
-
27
-
impl<'scope> Wake<'scope> for WakeStore<'scope> {
28
-
fn wake(&self) {
29
-
self.ready.replace(true);
30
-
if let Some(parent) = &self.parent.get() {
31
-
parent.wake();
32
-
}
33
-
}
34
-
}
35
-
36
-
pub struct FnWake<F: Fn()>(F);
37
-
38
-
impl<'scope, F: Fn()> Wake<'scope> for FnWake<F> {
39
-
fn wake(&self) {
40
-
self.0()
41
-
}
42
-
}
43
-
44
-
impl<'scope, F: Fn()> From<F> for FnWake<F> {
45
-
fn from(value: F) -> Self {
46
-
FnWake(value)
47
-
}
48
-
}
49
-
50
-
pub fn noop_wake() -> FnWake<fn()> {
51
-
FnWake(|| {})
52
-
}
···
-1
futures/Cargo.toml
-1
futures/Cargo.toml