Compare changes

Choose any two refs to compare.

+2477 -388
+2
.gitignore
··· 13 13 !/futures/** 14 14 !/futures-*/ 15 15 !/futures-*/** 16 + !/lifetime-guard/ 17 + !/lifetime-guard/**
+468 -3
Cargo.lock
··· 3 3 version = 4 4 4 5 5 [[package]] 6 + name = "aho-corasick" 7 + version = "1.1.3" 8 + source = "registry+https://github.com/rust-lang/crates.io-index" 9 + checksum = "8e60d3430d3a69478ad0993f19238d2df97c507009a52b3c10addcd7f6bcb916" 10 + dependencies = [ 11 + "memchr", 12 + ] 13 + 14 + [[package]] 15 + name = "cc" 16 + version = "1.2.33" 17 + source = "registry+https://github.com/rust-lang/crates.io-index" 18 + checksum = "3ee0f8803222ba5a7e2777dd72ca451868909b1ac410621b676adf07280e9b5f" 19 + dependencies = [ 20 + "shlex", 21 + ] 22 + 23 + [[package]] 24 + name = "cfg-if" 25 + version = "1.0.3" 26 + source = "registry+https://github.com/rust-lang/crates.io-index" 27 + checksum = "2fd1289c04a9ea8cb22300a459a72a385d7c73d3259e2ed7dcb2af674838cfa9" 28 + 29 + [[package]] 30 + name = "critical-section" 31 + version = "1.2.0" 32 + source = "registry+https://github.com/rust-lang/crates.io-index" 33 + checksum = "790eea4361631c5e7d22598ecd5723ff611904e3344ce8720784c93e3d83d40b" 34 + 35 + [[package]] 36 + name = "futures" 37 + version = "0.0.2" 38 + dependencies = [ 39 + "futures-combinators", 40 + "futures-compat", 41 + "futures-core", 42 + "futures-derive", 43 + ] 44 + 45 + [[package]] 6 46 name = "futures-combinators" 7 - version = "0.1.0" 47 + version = "0.0.2" 8 48 dependencies = [ 49 + "futures-compat", 9 50 "futures-core", 10 51 "futures-util", 52 + "lifetime-guard", 53 + ] 54 + 55 + [[package]] 56 + name = "futures-compat" 57 + version = "0.0.2" 58 + dependencies = [ 59 + "futures-core", 60 + "lifetime-guard", 11 61 ] 12 62 13 63 [[package]] 14 64 name = "futures-core" 65 + version = "0.0.2" 66 + 67 + [[package]] 68 + name = "futures-derive" 69 + version = "0.0.2" 70 + dependencies = [ 71 + "proc-macro2", 72 + "quote", 73 + "syn", 74 + ] 75 + 76 + [[package]] 77 + name = "futures-util" 78 + version = "0.0.2" 79 + dependencies = [ 80 + "futures-core", 81 + "lifetime-guard", 82 + ] 83 + 84 + [[package]] 85 + name = "generator" 86 + version = "0.8.5" 87 + source = "registry+https://github.com/rust-lang/crates.io-index" 88 + checksum = "d18470a76cb7f8ff746cf1f7470914f900252ec36bbc40b569d74b1258446827" 89 + dependencies = [ 90 + "cc", 91 + "cfg-if", 92 + "libc", 93 + "log", 94 + "rustversion", 95 + "windows", 96 + ] 97 + 98 + [[package]] 99 + name = "lazy_static" 100 + version = "1.5.0" 101 + source = "registry+https://github.com/rust-lang/crates.io-index" 102 + checksum = "bbd2bcb4c963f2ddae06a2efc7e9f3591312473c50c6685e1f298068316e66fe" 103 + 104 + [[package]] 105 + name = "libc" 106 + version = "0.2.175" 107 + source = "registry+https://github.com/rust-lang/crates.io-index" 108 + checksum = "6a82ae493e598baaea5209805c49bbf2ea7de956d50d7da0da1164f9c6d28543" 109 + 110 + [[package]] 111 + name = "lifetime-guard" 112 + version = "0.0.2" 113 + dependencies = [ 114 + "critical-section", 115 + "loom", 116 + ] 117 + 118 + [[package]] 119 + name = "log" 120 + version = "0.4.27" 121 + source = "registry+https://github.com/rust-lang/crates.io-index" 122 + checksum = "13dc2df351e3202783a1fe0d44375f7295ffb4049267b0f3018346dc122a1d94" 123 + 124 + [[package]] 125 + name = "loom" 126 + version = "0.7.2" 127 + source = "registry+https://github.com/rust-lang/crates.io-index" 128 + checksum = "419e0dc8046cb947daa77eb95ae174acfbddb7673b4151f56d1eed8e93fbfaca" 129 + dependencies = [ 130 + "cfg-if", 131 + "generator", 132 + "scoped-tls", 133 + "tracing", 134 + "tracing-subscriber", 135 + ] 136 + 137 + [[package]] 138 + name = "matchers" 15 139 version = "0.1.0" 140 + source = "registry+https://github.com/rust-lang/crates.io-index" 141 + checksum = "8263075bb86c5a1b1427b5ae862e8889656f126e9f77c484496e8b47cf5c5558" 142 + dependencies = [ 143 + "regex-automata 0.1.10", 144 + ] 16 145 17 146 [[package]] 18 - name = "futures-util" 147 + name = "memchr" 148 + version = "2.7.5" 149 + source = "registry+https://github.com/rust-lang/crates.io-index" 150 + checksum = "32a282da65faaf38286cf3be983213fcf1d2e2a58700e808f83f4ea9a4804bc0" 151 + 152 + [[package]] 153 + name = "nu-ansi-term" 154 + version = "0.46.0" 155 + source = "registry+https://github.com/rust-lang/crates.io-index" 156 + checksum = "77a8165726e8236064dbb45459242600304b42a5ea24ee2948e18e023bf7ba84" 157 + dependencies = [ 158 + "overload", 159 + "winapi", 160 + ] 161 + 162 + [[package]] 163 + name = "once_cell" 164 + version = "1.21.3" 165 + source = "registry+https://github.com/rust-lang/crates.io-index" 166 + checksum = "42f5e15c9953c5e4ccceeb2e7382a716482c34515315f7b03532b8b4e8393d2d" 167 + 168 + [[package]] 169 + name = "overload" 170 + version = "0.1.1" 171 + source = "registry+https://github.com/rust-lang/crates.io-index" 172 + checksum = "b15813163c1d831bf4a13c3610c05c0d03b39feb07f7e09fa234dac9b15aaf39" 173 + 174 + [[package]] 175 + name = "pin-project-lite" 176 + version = "0.2.16" 177 + source = "registry+https://github.com/rust-lang/crates.io-index" 178 + checksum = "3b3cff922bd51709b605d9ead9aa71031d81447142d828eb4a6eba76fe619f9b" 179 + 180 + [[package]] 181 + name = "proc-macro2" 182 + version = "1.0.95" 183 + source = "registry+https://github.com/rust-lang/crates.io-index" 184 + checksum = "02b3e5e68a3a1a02aad3ec490a98007cbc13c37cbe84a3cd7b8e406d76e7f778" 185 + dependencies = [ 186 + "unicode-ident", 187 + ] 188 + 189 + [[package]] 190 + name = "quote" 191 + version = "1.0.40" 192 + source = "registry+https://github.com/rust-lang/crates.io-index" 193 + checksum = "1885c039570dc00dcb4ff087a89e185fd56bae234ddc7f056a945bf36467248d" 194 + dependencies = [ 195 + "proc-macro2", 196 + ] 197 + 198 + [[package]] 199 + name = "regex" 200 + version = "1.11.1" 201 + source = "registry+https://github.com/rust-lang/crates.io-index" 202 + checksum = "b544ef1b4eac5dc2db33ea63606ae9ffcfac26c1416a2806ae0bf5f56b201191" 203 + dependencies = [ 204 + "aho-corasick", 205 + "memchr", 206 + "regex-automata 0.4.9", 207 + "regex-syntax 0.8.5", 208 + ] 209 + 210 + [[package]] 211 + name = "regex-automata" 212 + version = "0.1.10" 213 + source = "registry+https://github.com/rust-lang/crates.io-index" 214 + checksum = "6c230d73fb8d8c1b9c0b3135c5142a8acee3a0558fb8db5cf1cb65f8d7862132" 215 + dependencies = [ 216 + "regex-syntax 0.6.29", 217 + ] 218 + 219 + [[package]] 220 + name = "regex-automata" 221 + version = "0.4.9" 222 + source = "registry+https://github.com/rust-lang/crates.io-index" 223 + checksum = "809e8dc61f6de73b46c85f4c96486310fe304c434cfa43669d7b40f711150908" 224 + dependencies = [ 225 + "aho-corasick", 226 + "memchr", 227 + "regex-syntax 0.8.5", 228 + ] 229 + 230 + [[package]] 231 + name = "regex-syntax" 232 + version = "0.6.29" 233 + source = "registry+https://github.com/rust-lang/crates.io-index" 234 + checksum = "f162c6dd7b008981e4d40210aca20b4bd0f9b60ca9271061b07f78537722f2e1" 235 + 236 + [[package]] 237 + name = "regex-syntax" 238 + version = "0.8.5" 239 + source = "registry+https://github.com/rust-lang/crates.io-index" 240 + checksum = "2b15c43186be67a4fd63bee50d0303afffcef381492ebe2c5d87f324e1b8815c" 241 + 242 + [[package]] 243 + name = "rustversion" 244 + version = "1.0.22" 245 + source = "registry+https://github.com/rust-lang/crates.io-index" 246 + checksum = "b39cdef0fa800fc44525c84ccb54a029961a8215f9619753635a9c0d2538d46d" 247 + 248 + [[package]] 249 + name = "scoped-tls" 250 + version = "1.0.1" 251 + source = "registry+https://github.com/rust-lang/crates.io-index" 252 + checksum = "e1cf6437eb19a8f4a6cc0f7dca544973b0b78843adbfeb3683d1a94a0024a294" 253 + 254 + [[package]] 255 + name = "sharded-slab" 256 + version = "0.1.7" 257 + source = "registry+https://github.com/rust-lang/crates.io-index" 258 + checksum = "f40ca3c46823713e0d4209592e8d6e826aa57e928f09752619fc696c499637f6" 259 + dependencies = [ 260 + "lazy_static", 261 + ] 262 + 263 + [[package]] 264 + name = "shlex" 265 + version = "1.3.0" 266 + source = "registry+https://github.com/rust-lang/crates.io-index" 267 + checksum = "0fda2ff0d084019ba4d7c6f371c95d8fd75ce3524c3cb8fb653a3023f6323e64" 268 + 269 + [[package]] 270 + name = "smallvec" 271 + version = "1.15.1" 272 + source = "registry+https://github.com/rust-lang/crates.io-index" 273 + checksum = "67b1b7a3b5fe4f1376887184045fcf45c69e92af734b7aaddc05fb777b6fbd03" 274 + 275 + [[package]] 276 + name = "syn" 277 + version = "2.0.104" 278 + source = "registry+https://github.com/rust-lang/crates.io-index" 279 + checksum = "17b6f705963418cdb9927482fa304bc562ece2fdd4f616084c50b7023b435a40" 280 + dependencies = [ 281 + "proc-macro2", 282 + "quote", 283 + "unicode-ident", 284 + ] 285 + 286 + [[package]] 287 + name = "thread_local" 288 + version = "1.1.9" 289 + source = "registry+https://github.com/rust-lang/crates.io-index" 290 + checksum = "f60246a4944f24f6e018aa17cdeffb7818b76356965d03b07d6a9886e8962185" 291 + dependencies = [ 292 + "cfg-if", 293 + ] 294 + 295 + [[package]] 296 + name = "tracing" 297 + version = "0.1.41" 298 + source = "registry+https://github.com/rust-lang/crates.io-index" 299 + checksum = "784e0ac535deb450455cbfa28a6f0df145ea1bb7ae51b821cf5e7927fdcfbdd0" 300 + dependencies = [ 301 + "pin-project-lite", 302 + "tracing-core", 303 + ] 304 + 305 + [[package]] 306 + name = "tracing-core" 307 + version = "0.1.34" 308 + source = "registry+https://github.com/rust-lang/crates.io-index" 309 + checksum = "b9d12581f227e93f094d3af2ae690a574abb8a2b9b7a96e7cfe9647b2b617678" 310 + dependencies = [ 311 + "once_cell", 312 + "valuable", 313 + ] 314 + 315 + [[package]] 316 + name = "tracing-log" 317 + version = "0.2.0" 318 + source = "registry+https://github.com/rust-lang/crates.io-index" 319 + checksum = "ee855f1f400bd0e5c02d150ae5de3840039a3f54b025156404e34c23c03f47c3" 320 + dependencies = [ 321 + "log", 322 + "once_cell", 323 + "tracing-core", 324 + ] 325 + 326 + [[package]] 327 + name = "tracing-subscriber" 328 + version = "0.3.19" 329 + source = "registry+https://github.com/rust-lang/crates.io-index" 330 + checksum = "e8189decb5ac0fa7bc8b96b7cb9b2701d60d48805aca84a238004d665fcc4008" 331 + dependencies = [ 332 + "matchers", 333 + "nu-ansi-term", 334 + "once_cell", 335 + "regex", 336 + "sharded-slab", 337 + "smallvec", 338 + "thread_local", 339 + "tracing", 340 + "tracing-core", 341 + "tracing-log", 342 + ] 343 + 344 + [[package]] 345 + name = "unicode-ident" 346 + version = "1.0.18" 347 + source = "registry+https://github.com/rust-lang/crates.io-index" 348 + checksum = "5a5f39404a5da50712a4c1eecf25e90dd62b613502b7e925fd4e4d19b5c96512" 349 + 350 + [[package]] 351 + name = "valuable" 352 + version = "0.1.1" 353 + source = "registry+https://github.com/rust-lang/crates.io-index" 354 + checksum = "ba73ea9cf16a25df0c8caa16c51acb937d5712a8429db78a3ee29d5dcacd3a65" 355 + 356 + [[package]] 357 + name = "winapi" 358 + version = "0.3.9" 359 + source = "registry+https://github.com/rust-lang/crates.io-index" 360 + checksum = "5c839a674fcd7a98952e593242ea400abe93992746761e38641405d28b00f419" 361 + dependencies = [ 362 + "winapi-i686-pc-windows-gnu", 363 + "winapi-x86_64-pc-windows-gnu", 364 + ] 365 + 366 + [[package]] 367 + name = "winapi-i686-pc-windows-gnu" 368 + version = "0.4.0" 369 + source = "registry+https://github.com/rust-lang/crates.io-index" 370 + checksum = "ac3b87c63620426dd9b991e5ce0329eff545bccbbb34f3be09ff6fb6ab51b7b6" 371 + 372 + [[package]] 373 + name = "winapi-x86_64-pc-windows-gnu" 374 + version = "0.4.0" 375 + source = "registry+https://github.com/rust-lang/crates.io-index" 376 + checksum = "712e227841d057c1ee1cd2fb22fa7e5a5461ae8e48fa2ca79ec42cfc1931183f" 377 + 378 + [[package]] 379 + name = "windows" 380 + version = "0.61.3" 381 + source = "registry+https://github.com/rust-lang/crates.io-index" 382 + checksum = "9babd3a767a4c1aef6900409f85f5d53ce2544ccdfaa86dad48c91782c6d6893" 383 + dependencies = [ 384 + "windows-collections", 385 + "windows-core", 386 + "windows-future", 387 + "windows-link", 388 + "windows-numerics", 389 + ] 390 + 391 + [[package]] 392 + name = "windows-collections" 393 + version = "0.2.0" 394 + source = "registry+https://github.com/rust-lang/crates.io-index" 395 + checksum = "3beeceb5e5cfd9eb1d76b381630e82c4241ccd0d27f1a39ed41b2760b255c5e8" 396 + dependencies = [ 397 + "windows-core", 398 + ] 399 + 400 + [[package]] 401 + name = "windows-core" 402 + version = "0.61.2" 403 + source = "registry+https://github.com/rust-lang/crates.io-index" 404 + checksum = "c0fdd3ddb90610c7638aa2b3a3ab2904fb9e5cdbecc643ddb3647212781c4ae3" 405 + dependencies = [ 406 + "windows-implement", 407 + "windows-interface", 408 + "windows-link", 409 + "windows-result", 410 + "windows-strings", 411 + ] 412 + 413 + [[package]] 414 + name = "windows-future" 415 + version = "0.2.1" 416 + source = "registry+https://github.com/rust-lang/crates.io-index" 417 + checksum = "fc6a41e98427b19fe4b73c550f060b59fa592d7d686537eebf9385621bfbad8e" 418 + dependencies = [ 419 + "windows-core", 420 + "windows-link", 421 + "windows-threading", 422 + ] 423 + 424 + [[package]] 425 + name = "windows-implement" 426 + version = "0.60.0" 427 + source = "registry+https://github.com/rust-lang/crates.io-index" 428 + checksum = "a47fddd13af08290e67f4acabf4b459f647552718f683a7b415d290ac744a836" 429 + dependencies = [ 430 + "proc-macro2", 431 + "quote", 432 + "syn", 433 + ] 434 + 435 + [[package]] 436 + name = "windows-interface" 437 + version = "0.59.1" 438 + source = "registry+https://github.com/rust-lang/crates.io-index" 439 + checksum = "bd9211b69f8dcdfa817bfd14bf1c97c9188afa36f4750130fcdf3f400eca9fa8" 440 + dependencies = [ 441 + "proc-macro2", 442 + "quote", 443 + "syn", 444 + ] 445 + 446 + [[package]] 447 + name = "windows-link" 448 + version = "0.1.3" 449 + source = "registry+https://github.com/rust-lang/crates.io-index" 450 + checksum = "5e6ad25900d524eaabdbbb96d20b4311e1e7ae1699af4fb28c17ae66c80d798a" 451 + 452 + [[package]] 453 + name = "windows-numerics" 454 + version = "0.2.0" 455 + source = "registry+https://github.com/rust-lang/crates.io-index" 456 + checksum = "9150af68066c4c5c07ddc0ce30421554771e528bde427614c61038bc2c92c2b1" 457 + dependencies = [ 458 + "windows-core", 459 + "windows-link", 460 + ] 461 + 462 + [[package]] 463 + name = "windows-result" 464 + version = "0.3.4" 465 + source = "registry+https://github.com/rust-lang/crates.io-index" 466 + checksum = "56f42bd332cc6c8eac5af113fc0c1fd6a8fd2aa08a0119358686e5160d0586c6" 467 + dependencies = [ 468 + "windows-link", 469 + ] 470 + 471 + [[package]] 472 + name = "windows-strings" 473 + version = "0.4.2" 474 + source = "registry+https://github.com/rust-lang/crates.io-index" 475 + checksum = "56e6c93f3a0c3b36176cb1327a4958a0353d5d166c2a35cb268ace15e91d3b57" 476 + dependencies = [ 477 + "windows-link", 478 + ] 479 + 480 + [[package]] 481 + name = "windows-threading" 19 482 version = "0.1.0" 483 + source = "registry+https://github.com/rust-lang/crates.io-index" 484 + checksum = "b66463ad2e0ea3bbf808b7f1d371311c80e115c0b71d60efc142cafbcfb057a6" 20 485 dependencies = [ 21 - "futures-core", 486 + "windows-link", 22 487 ]
+10 -2
Cargo.toml
··· 1 1 [workspace] 2 2 resolver = "3" 3 - members = [ "futures-combinators", "futures-core", "futures-util"] 3 + members = [ "futures", "futures-combinators", "futures-compat", "futures-core", "futures-derive", "futures-util", "lifetime-guard"] 4 4 5 5 [workspace.package] 6 - version = "0.1.0" 6 + version = "0.0.2" 7 7 rust-version = "1.87" 8 8 edition = "2024" 9 9 license = "MIT OR Apache-2.0" ··· 11 11 repository = "https://github.com/AngleSideAngle/bcsc" 12 12 homepage = "https://github.com/AngleSideAngle/bcsc" 13 13 14 + [workspace.dependencies] 15 + # futures = { path = "futures", version = "0.0.2" } 16 + futures-combinators = { path = "futures-combinators", version = "0.0.2" } 17 + futures-compat = { path = "futures-compat", version = "0.0.2" } 18 + futures-core = { path = "futures-core", version = "0.0.2" } 19 + futures-derive = { path = "futures-derive", version = "0.0.2" } 20 + futures-util = { path = "futures-util", version = "0.0.2" } 21 + lifetime-guard = { path = "lifetime-guard", version = "0.0.2" }
+162 -9
README.md
··· 82 82 83 83 New async primitives that disallow intra-task concurrency, clone of `futures` and `futures-concurrency` for the new primitives. 84 84 85 - ## TODO: 86 - - [x] ScopedFuture 87 - - [ ] static combinators (Join Race etc), see futures-concurrency 88 - - [ ] `#[bsync]` or some compiler ScopedFuture generation 89 - - [ ] growable combinators (eg. `FutureGroup`, `FuturesUnordered`) (require alloc?) 90 - - [ ] unsound (needs `Forget`) multithreading 91 - - [ ] "rethinking async rust" 92 - - [ ] all of the above for streams 93 - - [ ] rfc? 94 85 95 86 channels: need lifetimed receievers, probably needs `Forget` (arc-like channels would be unsafe) 96 87 ··· 141 132 - need tons of interior mutability, since immutable/can't move means `poll` cannot take `&mut self`, cells everywhere 142 133 - nvm lots of unsafe code, but nothing really unsound 143 134 - potentially bad error messages? stuff like `join!` will have to output code that manually sets up the waker self ref 135 + 136 + ## TODO: 137 + - [x] ScopedFuture 138 + - [x] static combinators (Join Race etc), see futures-concurrency 139 + - [x] `#[async_scoped]` or some compiler ScopedFuture generation 140 + - [ ] doubly linked list waker registration 141 + - [ ] repeating static time reactors - eg. make event poll every N seconds 142 + - [ ] io uring reactors 143 + - [ ] growable combinators (eg. `FutureGroup`, `FuturesUnordered`) (require alloc?) 144 + - [ ] unsound (needs `Forget`) multithreading 145 + - [ ] "rethinking async rust" 146 + - [ ] all of the above for streams 147 + - [ ] rfc? 148 + 149 + # Chapter 3 150 + 151 + man this really sucks 152 + 153 + i need better things to do 154 + 155 + issues from ch 2 156 + - works great[*] 157 + - *: incompatible with event loop - something still has to poll 158 + - we're back to doubly linked list of waker registration in event loop 159 + - this requires Forget 160 + - ScopedFuture - Future interop sucks 161 + 162 + 163 + structured concurrency with regular combinators: 164 + - scope holds tasks 165 + - scope cancels tasks when dropped 166 + - tasks are ran by central executor 167 + 168 + pure structured concurrency with borrow checker: 169 + - high level block_on(), any task wake wakes every level up 170 + - tasks have events 171 + 172 + how do the tasks register to an event loop? they don't fuck 173 + 174 + 175 + ```rust 176 + struct Task<F: Future> { 177 + inner: F, 178 + prev: *const Task, 179 + next: *const Task, 180 + waker: Waker, 181 + } 182 + ``` 183 + 184 + &waker is passed into all sub-tasks, calling wake clone etc panics!!! 185 + this is pretty jank 186 + 187 + also waker doesn't have a lifetime so a safe code could easily register to external source that outlives the task 188 + this is unsound 189 + 190 + we need 191 + ```rust 192 + struct WakerRegister { 193 + prev: *const WakerRegister, 194 + next: *const WakerRegister, 195 + } 196 + ``` 197 + 198 + # Borrow Checked Structured Concurrency 199 + 200 + An async system needs to have the following components: 201 + 202 + - event loop : polls events and schedules tasks when they are ready 203 + - tasks : state machines that progress and can await events from the event loop 204 + - task combinators : tasks that compose other tasks into useful logic structures 205 + 206 + Tasks will register themselves to events on the event loop, which will need to outlive tasks and register pointers to wake the tasks, so that they can again be polled. 207 + This is incompatible with the borrow checker because the task pointers (wakers) are being stored inside an event loop with a lifetime that may exceed the tasks'. 208 + 209 + `Waker` is effectively a `*const dyn Wake`. It is implemented using a custom `RawWakerVTable` rather than a `dyn` ptr to allow for `Wake::wake(self)`, which is not object safe. 210 + This method is necessary for runtime implementations that rely on the wakers to be effectively `Arc<Task>`, since `wake(self)` consumes `self` and decrements the reference count. 211 + 212 + There are two types of sound async runtimes that currently exist in rust: 213 + 214 + [tokio](https://github.com/tokio-rs) and [smol](https://github.com/smol-rs) work using the afformentioned reference counting system to ensure wakers aren't dangling pointers to tasks that no longer exist. 215 + 216 + [embassy](https://github.com/embassy-rs/embassy) and [rtic](https://github.com/rtic-rs/rtic) work by ensuring tasks are stored in `static` task pools for `N` tasks. Scheduled tasks are represented by an intrusively linked list to avoid allocation, and wakers can't be dangling pointers because completed tasks will refuse to add themselves back to the linked list, or will be replaced by a new task. This is useful in environments where it is desirable to avoid heap allocation, but requires the user annotate the maximum number of a specific task that can exist at one time, and fails to spawn tasks when they exceed that limit. 217 + 218 + An async runtime where futures are allocated to the stack cannot be sound under this model because `Future::poll` allows any safe `Future` implementation to store or clone wakers wherever they want, which become dangling pointers after the stack allocated future goes out of scope. In order to prevent this, we must have a circular reference, where the task (`Task<'scope>: Wake`) contains a `&'scope Waker` and the `Waker` contains `*const dyn Wake`. For that to be safe, the `Waker` must never be moved. This cannot be possible because something needs to register the waker: 219 + 220 + ```rust 221 + // waker is moved 222 + poll(self: Pin<&mut Self>, cx: &mut Context<'_> /* '_ is the duration of the poll fn */) -> Poll<Self::Output> { 223 + // waker is moved! 224 + // can't register &Waker bc we run into the same problem, 225 + // and the waker only lives for '_ 226 + store.register(cx.waker()) 227 + } 228 + ``` 229 + 230 + Effectively, by saying our waker can't move, we are saying it must be stored by the task, which means it can't be a useful waker. Instead, what we could do is have a waker-register (verb, not noun) that facilitates the binding of an immovable waker to an immovable task, where the waker is guaranteed to outlive the task: 231 + 232 + ```rust 233 + pub trait Wake<'task> { 234 + fn wake(&self); 235 + fn register_waker(&mut self, waker: &'task Waker); 236 + } 237 + 238 + pub struct Waker { 239 + task: *const dyn Wake, 240 + valid: bool, // task is in charge of invalidating when it goes out of scope 241 + } 242 + 243 + pub struct WakerRegistration<'poll> { 244 + task: &'poll mut dyn Wake, 245 + } 246 + 247 + impl<'poll> WakerRegistration<'poll> { 248 + pub fn register<'task>(self, slot: &'task Waker) 249 + where 250 + 'task: 'poll, 251 + Self: 'task 252 + { 253 + *slot = Waker::new(self.task as *const dyn Wake); 254 + *task.register_waker(slot) 255 + } 256 + } 257 + ``` 258 + 259 + This system works better because `WakerRegistration` only lives 260 + 261 + 262 + 263 + Experienced rust programmers might be reading this and thinking I am stupid (true) because `Forget` 264 + 265 + An astute (soon to be disappointed) reader might be thinking, as I did when I first learned about this sytem, "what if we ensured that there was only one `Waker` per `Task`, and gave the task a pointer to the waker, so that it could disable the waker when dropped?" 266 + 267 + Unfortunately, there are a multitude of issues with this system 268 + 269 + - In order to hold a pointer to the Waker from the 270 + - Preventing a `Waker` from moving means panicking on the 271 + 272 + Even if it was guaranteed that wakers could not be moved or `cloned` (by panicking on `clone`), and registration occured via `&Waker`, the task would still be unable to 273 + 274 + https://conradludgate.com/posts/async-stack 275 + 276 + ## Structured Concurrency 277 + 278 + https://trio.discourse.group/t/discussion-notes-on-structured-concurrency-or-go-statement-considered-harmful/25 279 + https://kotlinlang.org/docs/coroutines-basics.html 280 + 281 + Notably, the structured concurrency pattern fits very nicely with our hypothetical unsound stack based async runtime. 282 + 283 + ## WeakCell Pattern & Forget trait 284 + 285 + https://github.com/rust-lang/rfcs/pull/3782 286 + 287 + 288 + There are two solutions: 289 + 290 + - `Wakers` panic on `clone()` 291 + 292 + ## Waker allocation problem & intra task concurrency 293 + 294 + we can't do intra task concurrency because WeakRegistrations 295 + 296 +
+22 -66
flake.lock
··· 1 1 { 2 2 "nodes": { 3 - "flake-utils": { 4 - "inputs": { 5 - "systems": "systems" 6 - }, 7 - "locked": { 8 - "lastModified": 1731533236, 9 - "narHash": "sha256-l0KFg5HjrsfsO/JpG+r7fRrqm12kzFHyUHqHCVpMMbI=", 10 - "owner": "numtide", 11 - "repo": "flake-utils", 12 - "rev": "11707dc2f618dd54ca8739b309ec4fc024de578b", 13 - "type": "github" 14 - }, 15 - "original": { 16 - "owner": "numtide", 17 - "repo": "flake-utils", 18 - "type": "github" 19 - } 20 - }, 21 3 "naersk": { 22 4 "inputs": { 23 - "nixpkgs": "nixpkgs" 5 + "nixpkgs": [ 6 + "nixpkgs" 7 + ] 24 8 }, 25 9 "locked": { 26 10 "lastModified": 1752249768, ··· 38 22 }, 39 23 "nixpkgs": { 40 24 "locked": { 41 - "lastModified": 1752077645, 42 - "narHash": "sha256-HM791ZQtXV93xtCY+ZxG1REzhQenSQO020cu6rHtAPk=", 43 - "owner": "NixOS", 44 - "repo": "nixpkgs", 45 - "rev": "be9e214982e20b8310878ac2baa063a961c1bdf6", 46 - "type": "github" 47 - }, 48 - "original": { 49 - "owner": "NixOS", 50 - "ref": "nixpkgs-unstable", 51 - "repo": "nixpkgs", 52 - "type": "github" 53 - } 54 - }, 55 - "nixpkgs-mozilla": { 56 - "flake": false, 57 - "locked": { 58 - "lastModified": 1744624473, 59 - "narHash": "sha256-S6zT/w5SyAkJ//dYdjbrXgm+6Vkd/k7qqUl4WgZ6jjk=", 60 - "owner": "mozilla", 61 - "repo": "nixpkgs-mozilla", 62 - "rev": "2292d4b35aa854e312ad2e95c4bb5c293656f21a", 63 - "type": "github" 64 - }, 65 - "original": { 66 - "owner": "mozilla", 67 - "repo": "nixpkgs-mozilla", 68 - "type": "github" 69 - } 70 - }, 71 - "nixpkgs_2": { 72 - "locked": { 73 - "lastModified": 1752077645, 74 - "narHash": "sha256-HM791ZQtXV93xtCY+ZxG1REzhQenSQO020cu6rHtAPk=", 25 + "lastModified": 1756787288, 26 + "narHash": "sha256-rw/PHa1cqiePdBxhF66V7R+WAP8WekQ0mCDG4CFqT8Y=", 75 27 "owner": "NixOS", 76 28 "repo": "nixpkgs", 77 - "rev": "be9e214982e20b8310878ac2baa063a961c1bdf6", 29 + "rev": "d0fc30899600b9b3466ddb260fd83deb486c32f1", 78 30 "type": "github" 79 31 }, 80 32 "original": { 81 33 "owner": "NixOS", 82 - "ref": "nixpkgs-unstable", 34 + "ref": "nixos-unstable", 83 35 "repo": "nixpkgs", 84 36 "type": "github" 85 37 } 86 38 }, 87 39 "root": { 88 40 "inputs": { 89 - "flake-utils": "flake-utils", 90 41 "naersk": "naersk", 91 - "nixpkgs": "nixpkgs_2", 92 - "nixpkgs-mozilla": "nixpkgs-mozilla" 42 + "nixpkgs": "nixpkgs", 43 + "rust-overlay": "rust-overlay" 93 44 } 94 45 }, 95 - "systems": { 46 + "rust-overlay": { 47 + "inputs": { 48 + "nixpkgs": [ 49 + "nixpkgs" 50 + ] 51 + }, 96 52 "locked": { 97 - "lastModified": 1681028828, 98 - "narHash": "sha256-Vy1rq5AaRuLzOxct8nz4T6wlgyUR7zLU309k9mBC768=", 99 - "owner": "nix-systems", 100 - "repo": "default", 101 - "rev": "da67096a3b9bf56a91d16901293e51ba5b49a27e", 53 + "lastModified": 1757039615, 54 + "narHash": "sha256-qm53+EUFfzyF8F0MEscHGqf9tx462GV3/zUZrn9wiQU=", 55 + "owner": "oxalica", 56 + "repo": "rust-overlay", 57 + "rev": "4486e04adbb4b0e39f593767f2c36e2211003d01", 102 58 "type": "github" 103 59 }, 104 60 "original": { 105 - "owner": "nix-systems", 106 - "repo": "default", 61 + "owner": "oxalica", 62 + "repo": "rust-overlay", 107 63 "type": "github" 108 64 } 109 65 }
+127 -27
flake.nix
··· 1 1 { 2 + description = "Stack allocated structured concurrency"; 3 + 2 4 inputs = { 3 - flake-utils.url = "github:numtide/flake-utils"; 4 - naersk.url = "github:nix-community/naersk"; 5 - nixpkgs.url = "github:NixOS/nixpkgs/nixpkgs-unstable"; 6 - nixpkgs-mozilla = { 7 - url = "github:mozilla/nixpkgs-mozilla"; 8 - flake = false; 5 + nixpkgs.url = "github:NixOS/nixpkgs/nixos-unstable"; 6 + 7 + rust-overlay = { 8 + url = "github:oxalica/rust-overlay"; 9 + inputs.nixpkgs.follows = "nixpkgs"; 10 + }; 11 + 12 + naersk = { 13 + url = "github:nix-community/naersk"; 14 + inputs.nixpkgs.follows = "nixpkgs"; 9 15 }; 10 16 }; 11 17 12 - outputs = { self, flake-utils, naersk, nixpkgs, nixpkgs-mozilla }: 13 - flake-utils.lib.eachDefaultSystem (system: 14 - let 15 - pkgs = (import nixpkgs) { 16 - inherit system; 17 - overlays = [ 18 - (import nixpkgs-mozilla) 19 - ]; 20 - }; 18 + outputs = { nixpkgs, rust-overlay, naersk, ... }: 19 + let 20 + forAllSystems = function: 21 + nixpkgs.lib.genAttrs [ 22 + "x86_64-linux" 23 + "aarch64-linux" 24 + "riscv64-linux" 25 + "aarch64-darwin" 26 + ] (system: function nixpkgs.legacyPackages.${system}); 27 + 28 + forEachSystem = forAllSystems (pkgs: 29 + let 30 + pkgs' = pkgs.extend rust-overlay.overlays.default; 31 + 32 + rust-stable = pkgs'.rust-bin.stable.latest.default.override { 33 + extensions = [ "rust-src" "clippy" "rustfmt" ]; 34 + }; 35 + 36 + rust-nightly = pkgs'.rust-bin.nightly.latest.default.override { 37 + extensions = [ "rust-src" "clippy" "rustfmt" "miri" ]; 38 + }; 21 39 22 - naersk' = pkgs.callPackage naersk {}; 40 + naersk-stable = pkgs'.callPackage naersk { 41 + cargo = rust-stable; 42 + rustc = rust-stable; 43 + }; 23 44 24 - in { 25 - # For `nix build` & `nix run`: 26 - defaultPackage = naersk'.buildPackage { 27 - src = ./.; 28 - }; 45 + naersk-nightly = pkgs'.callPackage naersk { 46 + cargo = rust-nightly; 47 + rustc = rust-nightly; 48 + }; 29 49 30 - # For `nix develop`: 31 - devShell = pkgs.mkShell { 32 - nativeBuildInputs = with pkgs; [ rustc cargo clippy rustfmt rust-analyzer cargo-expand ]; 33 - }; 34 - } 35 - ); 50 + buildInputs = []; 51 + nativeBuildInputs = []; 52 + 53 + # Production package built with stable Rust 54 + package = naersk-stable.buildPackage { 55 + src = ./.; 56 + inherit buildInputs nativeBuildInputs; 57 + }; 58 + 59 + # Development package built with nightly (for Miri compatibility) 60 + packageNightly = naersk-nightly.buildPackage { 61 + src = ./.; 62 + inherit buildInputs nativeBuildInputs; 63 + }; 64 + 65 + in 66 + { 67 + packages = { 68 + default = package; 69 + stable = package; 70 + nightly = packageNightly; 71 + }; 72 + 73 + devShells = { 74 + default = pkgs'.mkShell { 75 + buildInputs = buildInputs ++ [ 76 + rust-stable 77 + pkgs'.rust-analyzer 78 + ]; 79 + nativeBuildInputs = nativeBuildInputs; 80 + 81 + # RUST_SRC_PATH = "${rust-stable}/lib/rustlib/src/rust/library"; 82 + }; 83 + 84 + nightly = pkgs'.mkShell { 85 + buildInputs = buildInputs ++ [ 86 + rust-nightly 87 + # pkgs'.rust-analyzer 88 + ]; 89 + nativeBuildInputs = nativeBuildInputs; 90 + 91 + # RUST_SRC_PATH = "${rust-nightly}/lib/rustlib/src/rust/library"; 92 + MIRIFLAGS = "-Zmiri-strict-provenance"; 93 + 94 + shellHook = '' 95 + echo "Nightly Rust environment with Miri" 96 + ''; 97 + }; 98 + }; 99 + 100 + checks = { 101 + test = pkgs'.runCommand "cargo-test" { 102 + buildInputs = [ rust-stable ] ++ buildInputs; 103 + nativeBuildInputs = nativeBuildInputs; 104 + } '' 105 + cargo test --release 106 + ''; 107 + 108 + miri = pkgs'.runCommand "cargo-miri-test" { 109 + buildInputs = [ rust-nightly ] ++ buildInputs; 110 + nativeBuildInputs = nativeBuildInputs; 111 + } '' 112 + cargo miri test --release 113 + ''; 114 + 115 + clippy = pkgs'.runCommand "cargo-clippy" { 116 + buildInputs = [ rust-stable ] ++ buildInputs; 117 + nativeBuildInputs = nativeBuildInputs; 118 + } '' 119 + cargo clippy --deny 120 + ''; 121 + 122 + fmt = pkgs'.runCommand "cargo-fmt" { 123 + buildInputs = [ rust-stable ]; 124 + } '' 125 + cargo fmt --all --check 126 + ''; 127 + }; 128 + } 129 + ); 130 + in 131 + { 132 + packages = forAllSystems (pkgs: forEachSystem.${pkgs.system}.packages); 133 + devShells = forAllSystems (pkgs: forEachSystem.${pkgs.system}.devShells); 134 + checks = forAllSystems (pkgs: forEachSystem.${pkgs.system}.checks); 135 + }; 36 136 }
+4
futures/Cargo.toml
··· 9 9 homepage.workspace = true 10 10 11 11 [dependencies] 12 + futures-core = { workspace = true } 13 + futures-combinators = { workspace = true } 14 + futures-compat = { workspace = true } 15 + futures-derive = { workspace = true }
-1
futures/src/combinators/mod.rs
··· 1 - mod join;
+14 -34
futures/src/lib.rs
··· 1 - mod combinators; 2 - mod future; 3 - mod utils; 4 - 5 - /// from yoshuawuyts/futures-concurrency 6 - /// Wait for all futures to complete. 7 - /// 8 - /// Awaits multiple futures simultaneously, returning the output of the futures 9 - /// in the same container type they were created once all complete. 10 - 11 - // scoped future combinators: 12 - // 13 - // Join<N> 14 - // TryJoin 15 - // Race 16 - // RaceOk 17 - // 18 - // add Deadline(a, rest) (deadline_against()) 19 - // also functionality like (a, b, c).join().race_against(d, e, f) 20 - // 21 - // UnorderedJoinQueueStream? is this VecJoinStream? 22 - // OrderedJoinQueueStream 1 + #![no_std] 23 2 24 - // pub trait ScopedStream<'scope> { 25 - // type Item; 3 + use futures_compat::LocalWaker; 4 + use futures_derive::async_function; 26 5 27 - // fn poll_next(self: Pin<&mut Self>, cx: &'scope mut dyn ScopedWake) -> Poll<Option<Self::Item>>; 28 - // } 6 + async fn evil() {} 29 7 30 - #[cfg(test)] 31 - mod tests { 32 - use super::*; 8 + #[async_function] 9 + fn inner(a: i32, b: &i32) -> i32 { 10 + // evil().await; 11 + 1 12 + } 33 13 34 - #[test] 35 - fn it_works() { 36 - let result = add(2, 2); 37 - assert_eq!(result, 4); 38 - } 14 + #[async_function] 15 + fn test(a: i32, b: &i32) -> i32 { 16 + futures_derive::async_block! { let _ = 1 + *b; 2 }.await 39 17 } 18 + 19 + // fn test2<'a>(a: i32) {}
+4 -2
futures-combinators/Cargo.toml
··· 9 9 homepage.workspace = true 10 10 11 11 [dependencies] 12 - futures-core = { path = "../futures-core" } 13 - futures-util = { path = "../futures-util" } 12 + futures-core = { workspace = true } 13 + futures-compat = { workspace = true } 14 + futures-util = { workspace = true } 15 + lifetime-guard = { workspace = true }
+108 -79
futures-combinators/src/join.rs
··· 1 - use futures_core::{ScopedFuture, Wake}; 2 - use futures_util::{MaybeDone, MaybeDoneState, maybe_done}; 3 - use std::{cell::Cell, task::Poll}; 1 + use crate::wake::WakeArray; 2 + use futures_compat::LocalWaker; 3 + use futures_core::FusedFuture; 4 + use futures_util::maybe_done::MaybeDone; 5 + use futures_util::maybe_done::maybe_done; 6 + use std::pin::Pin; 7 + use std::task::Poll; 4 8 5 9 /// from [futures-concurrency](https://github.com/yoshuawuyts/futures-concurrency/tree/main) 6 10 /// Wait for all futures to complete. 7 11 /// 8 12 /// Awaits multiple futures simultaneously, returning the output of the futures 9 13 /// in the same container type they were created once all complete. 10 - pub trait Join<'scope> { 14 + pub trait Join { 11 15 /// The resulting output type. 12 16 type Output; 13 17 14 18 /// The [`ScopedFuture`] implementation returned by this method. 15 - type Future: ScopedFuture<'scope, Output = Self::Output>; 19 + type Future: futures_core::Future<LocalWaker, Output = Self::Output>; 16 20 17 21 /// Waits for multiple futures to complete. 18 22 /// ··· 23 27 fn join(self) -> Self::Future; 24 28 } 25 29 26 - struct WakeStore<'scope> { 27 - parent: Cell<Option<&'scope dyn Wake<'scope>>>, 28 - ready: Cell<bool>, 29 - } 30 - 31 - impl<'scope> WakeStore<'scope> { 32 - fn new() -> Self { 33 - Self { 34 - parent: Option::None.into(), 35 - ready: true.into(), 36 - } 37 - } 38 - fn take_ready(&self) -> bool { 39 - self.ready.replace(false) 30 + pub trait JoinExt { 31 + fn along_with<Fut>(self, other: Fut) -> Join2<Self, Fut> 32 + where 33 + Self: Sized + futures_core::Future<LocalWaker>, 34 + Fut: futures_core::Future<LocalWaker>, 35 + { 36 + (self, other).join() 40 37 } 41 38 } 42 39 43 - impl<'scope> Wake<'scope> for WakeStore<'scope> { 44 - fn wake(&self) { 45 - self.ready.replace(true); 46 - if let Some(parent) = &self.parent.get() { 47 - parent.wake(); 48 - } 49 - } 50 - } 40 + impl<T> JoinExt for T where T: futures_core::Future<LocalWaker> {} 51 41 52 42 macro_rules! impl_join_tuple { 53 - ($namespace: ident $StructName:ident $($F:ident)+) => { 54 - 43 + ($namespace:ident $StructName:ident $($F:ident)+) => { 55 44 mod $namespace { 56 - use super::*; 57 - 58 - #[allow(non_snake_case)] 59 - pub struct Wakers<'scope> { 60 - $(pub $F: WakeStore<'scope>,)* 61 - } 62 - 63 - // this is so stupid 64 - #[allow(non_snake_case)] 65 - pub struct WakerRefs<'scope> { 66 - $(pub $F: Cell<Option<&'scope dyn Wake<'scope>>>,)* 67 - } 45 + #[repr(u8)] 46 + pub(super) enum Indexes { $($F,)+ } 47 + pub(super) const LEN: usize = [$(Indexes::$F,)+].len(); 68 48 } 69 49 70 50 #[allow(non_snake_case)] 71 51 #[must_use = "futures do nothing unless you `.await` or poll them"] 72 - pub struct $StructName<'scope, $($F: ScopedFuture<'scope>),+> { 73 - $($F: MaybeDone<'scope, $F>,)* 74 - wakers: $namespace::Wakers<'scope>, 75 - refs: $namespace::WakerRefs<'scope>, 52 + pub struct $StructName<$($F: futures_core::Future<LocalWaker>),+> { 53 + $($F: MaybeDone<$F>,)* 54 + wake_array: WakeArray<{$namespace::LEN}>, 76 55 } 77 56 78 - impl<'scope, $($F: ScopedFuture<'scope> + 'scope),+> ScopedFuture<'scope> 79 - for $StructName<'scope, $($F),+> 57 + impl<$($F: futures_core::Future<LocalWaker>),+> futures_core::Future<LocalWaker> for $StructName<$($F),+> 80 58 { 81 59 type Output = ($($F::Output),+); 82 60 83 - fn poll(&'scope self, wake: &'scope dyn Wake<'scope>) -> Poll<Self::Output> { 61 + #[allow(non_snake_case)] 62 + fn poll(self: Pin<&mut Self>, waker: Pin<&LocalWaker>) -> Poll<Self::Output> { 63 + let this = unsafe { self.get_unchecked_mut() }; 64 + 65 + let wake_array = unsafe { Pin::new_unchecked(&this.wake_array) }; 66 + $( 67 + // TODO debug_assert_matches is nightly https://github.com/rust-lang/rust/issues/82775 68 + debug_assert!(!matches!(this.$F, MaybeDone::Gone), "do not poll futures after they return Poll::Ready"); 69 + let mut $F = unsafe { Pin::new_unchecked(&mut this.$F) }; 70 + )+ 71 + 72 + wake_array.register_parent(waker); 73 + 84 74 let mut ready = true; 85 75 86 76 $( 87 - self.wakers.$F.parent.replace(Some(wake)) ; 88 - self.refs.$F.replace(Some(&self.wakers.$F)); 77 + let index = $namespace::Indexes::$F as usize; 78 + let waker = unsafe { wake_array.child_guard_ptr(index).unwrap_unchecked() }; 89 79 90 - // # SAFETY 91 - // `fut` MUST NOT LIVE PAST THIS BLOCK 92 - // OTHER MaybeDone METHODS MUTATE `self` AND `fut` HOLDS 93 - // IMMUTABLE REFERENCE INVARIANT 94 - if let MaybeDoneState::Future(fut) = unsafe { self.$F.get_state() } { 95 - ready &= if self.wakers.$F.take_ready() { 96 - // by polling the future, we create our self referentials truct for lifetime 'scope 97 - // # SAFETY 98 - // unwrap_unchecked is safe because we just put a Some value into our refs.$F 99 - // so it is guaranteed to be Some 100 - fut.poll(unsafe { (&self.refs.$F.get()).unwrap_unchecked() }).is_ready() 101 - } else { 102 - false 103 - }; 104 - } 80 + // ready if MaybeDone is Done or just completed (converted to Done) 81 + // unsafe / against Future api contract to poll after Gone/Future is finished 82 + ready &= if unsafe { dbg!(wake_array.take_woken(index).unwrap_unchecked()) } { 83 + $F.as_mut().poll(waker).is_ready() 84 + } else { 85 + $F.is_terminated() 86 + }; 105 87 )+ 106 88 107 89 if ready { 108 90 Poll::Ready(( 109 91 $( 110 - // # SAFETY 111 - // `ready == true` when all futures are already 112 - // complete or just complete. Once not `MaybeDoneState::Future`, futures transition to `MaybeDoneState::Done`. We don't poll them after, or take their outputs so we know the result of `take_output` must be `Some` 92 + // SAFETY: 93 + // `ready == true` when all futures are complete. 94 + // Once a future is not `MaybeDoneState::Future`, it transitions to `Done`, 95 + // so we know the result of `take_output` must be `Some`. 113 96 unsafe { 114 - self.$F 115 - .take_output() 116 - .unwrap_unchecked() 97 + $F.take_output().unwrap_unchecked() 117 98 }, 118 99 )* 119 100 )) ··· 123 104 } 124 105 } 125 106 126 - impl<'scope, $($F: ScopedFuture<'scope> + 'scope),+> Join<'scope> for ($($F),+) { 107 + impl<$($F: futures_core::Future<LocalWaker>),+> Join for ($($F),+) { 127 108 type Output = ($($F::Output),*); 128 - type Future = $StructName<'scope, $($F),+>; 109 + type Future = $StructName<$($F),+>; 129 110 130 111 #[allow(non_snake_case)] 131 112 fn join(self) -> Self::Future { ··· 133 114 134 115 $StructName { 135 116 $($F: maybe_done($F),)* 136 - wakers: $namespace::Wakers { $($F: WakeStore::new(),)* }, 137 - refs: $namespace::WakerRefs { $($F: Option::None.into(),)* } 117 + wake_array: WakeArray::new(), 138 118 } 139 119 } 140 120 } ··· 155 135 156 136 #[cfg(test)] 157 137 mod tests { 158 - use futures_util::poll_fn; 138 + #![no_std] 139 + 140 + use futures_core::Future; 141 + use futures_util::{dummy_guard, poll_fn}; 142 + 143 + use crate::wake::local_wake; 159 144 160 145 use super::*; 146 + 147 + use std::pin; 161 148 162 149 #[test] 163 - fn basic() { 150 + fn counters() { 151 + let mut x1 = 0; 152 + let mut x2 = 0; 153 + let f1 = poll_fn(|waker| { 154 + local_wake(waker); 155 + x1 += 1; 156 + if x1 == 4 { 157 + Poll::Ready(x1) 158 + } else { 159 + Poll::Pending 160 + } 161 + }); 162 + let f2 = poll_fn(|waker| { 163 + local_wake(waker); 164 + x2 += 1; 165 + if x2 == 5 { 166 + Poll::Ready(x2) 167 + } else { 168 + Poll::Pending 169 + } 170 + }); 171 + let guard = pin::pin!(dummy_guard()); 172 + let mut join = pin::pin!((f1, f2).join()); 173 + for _ in 0..4 { 174 + assert_eq!(join.as_mut().poll(guard.as_ref()), Poll::Pending); 175 + } 176 + assert_eq!(join.poll(guard.as_ref()), Poll::Ready((4, 5))); 177 + } 178 + 179 + #[test] 180 + fn never_wake() { 181 + let f1 = poll_fn(|_| Poll::<i32>::Ready(0)); 182 + let f2 = poll_fn(|_| Poll::<i32>::Pending); 183 + let guard = pin::pin!(dummy_guard()); 184 + let mut join = pin::pin!((f1, f2).join()); 185 + for _ in 0..10 { 186 + assert_eq!(join.as_mut().poll(guard.as_ref()), Poll::Pending); 187 + } 188 + } 189 + 190 + #[test] 191 + fn immediate() { 164 192 let f1 = poll_fn(|_| Poll::Ready(1)); 165 193 let f2 = poll_fn(|_| Poll::Ready(2)); 166 - let dummy_waker = WakeStore::new(); 167 - assert_eq!((f1, f2).join().poll(&dummy_waker), Poll::Ready((1, 2))); 194 + let join = pin::pin!(f1.along_with(f2)); 195 + let guard = pin::pin!(dummy_guard()); 196 + assert_eq!(join.poll(guard.as_ref()), Poll::Ready((1, 2))); 168 197 } 169 198 }
+6 -1
futures-combinators/src/lib.rs
··· 1 - mod join; 1 + pub mod join; 2 + pub mod race; 3 + mod wake; 4 + 5 + use join::*; 6 + use race::*;
+200
futures-combinators/src/race.rs
··· 1 + use futures_util::LocalWaker; 2 + 3 + use crate::wake::WakeArray; 4 + use std::pin::Pin; 5 + use std::task::Poll; 6 + 7 + /// from [futures-concurrency](https://github.com/yoshuawuyts/futures-concurrency/tree/main) 8 + /// Wait for the first future to complete. 9 + /// 10 + /// Awaits multiple future at once, returning as soon as one completes. The 11 + /// other futures are cancelled. 12 + pub trait Race { 13 + /// The resulting output type. 14 + type Output; 15 + 16 + /// The [`ScopedFuture`] implementation returned by this method. 17 + type Future: futures_core::Future<LocalWaker, Output = Self::Output>; 18 + 19 + /// Wait for the first future to complete. 20 + /// 21 + /// Awaits multiple futures at once, returning as soon as one completes. The 22 + /// other futures are cancelled. 23 + /// 24 + /// This function returns a new future which polls all futures concurrently. 25 + fn race(self) -> Self::Future; 26 + } 27 + 28 + pub trait RaceExt<'scope> { 29 + fn race_with<Fut>(self, other: Fut) -> Race2<Self, Fut> 30 + where 31 + Self: Sized + futures_core::Future<LocalWaker>, 32 + Fut: futures_core::Future<LocalWaker>, 33 + { 34 + (self, other).race() 35 + } 36 + } 37 + 38 + impl<'scope, T> RaceExt<'scope> for T where T: futures_core::Future<LocalWaker> {} 39 + 40 + macro_rules! impl_race_tuple { 41 + ($namespace:ident $StructName:ident $OutputsName:ident $($F:ident)+) => { 42 + mod $namespace { 43 + #[repr(u8)] 44 + pub(super) enum Indexes { $($F,)+ } 45 + pub(super) const LEN: usize = [$(Indexes::$F,)+].len(); 46 + } 47 + 48 + pub enum $OutputsName<$($F,)+> { 49 + $($F($F),)+ 50 + } 51 + 52 + impl<$($F: std::fmt::Debug,)+> std::fmt::Debug for $OutputsName<$($F,)+> { 53 + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { 54 + match self {$( 55 + Self::$F(x) => 56 + f.debug_tuple(std::stringify!($F)) 57 + .field(x) 58 + .finish(), 59 + )+} 60 + } 61 + } 62 + 63 + impl<$($F: PartialEq,)+> PartialEq for $OutputsName<$($F,)+> { 64 + fn eq(&self, other: &Self) -> bool { 65 + match (self, other) { 66 + $((Self::$F(a), Self::$F(b)) => a == b,)+ 67 + _ => false 68 + } 69 + } 70 + } 71 + 72 + #[allow(non_snake_case)] 73 + #[must_use = "futures do nothing unless you `.await` or poll them"] 74 + pub struct $StructName<$($F: futures_core::Future<LocalWaker>),+> { 75 + $($F: $F,)* 76 + wake_array: WakeArray<{$namespace::LEN}>, 77 + } 78 + 79 + impl<'scope, $($F: futures_core::Future<LocalWaker>),+> futures_core::Future<LocalWaker> 80 + for $StructName<$($F),+> 81 + { 82 + type Output = $OutputsName<$($F::Output,)+>; 83 + 84 + #[allow(non_snake_case)] 85 + fn poll(self: Pin<&mut Self>, waker: Pin<&LocalWaker>) -> Poll<Self::Output> { 86 + let this = unsafe { self.get_unchecked_mut() }; 87 + 88 + let wake_array = unsafe { Pin::new_unchecked(&this.wake_array) }; 89 + $( 90 + let mut $F = unsafe { Pin::new_unchecked(&mut this.$F) }; 91 + )+ 92 + 93 + wake_array.register_parent(waker); 94 + 95 + $( 96 + let index = $namespace::Indexes::$F as usize; 97 + let waker = unsafe { wake_array.child_guard_ptr(index).unwrap_unchecked() }; 98 + 99 + // this is safe because we know index < LEN 100 + if unsafe { wake_array.take_woken(index).unwrap_unchecked() } { 101 + if let Poll::Ready(res) = $F.as_mut().poll(waker) { 102 + return Poll::Ready($OutputsName::$F(res)); 103 + } 104 + } 105 + )+ 106 + 107 + Poll::Pending 108 + } 109 + } 110 + 111 + impl<'scope, $($F: futures_core::Future<LocalWaker>),+> Race for ($($F),+) { 112 + type Output = $OutputsName<$($F::Output),*>; 113 + type Future = $StructName<$($F),+>; 114 + 115 + #[allow(non_snake_case)] 116 + fn race(self) -> Self::Future { 117 + let ($($F),+) = self; 118 + 119 + $StructName { 120 + $($F: $F,)* 121 + wake_array: WakeArray::new(), 122 + } 123 + } 124 + } 125 + }; 126 + } 127 + 128 + impl_race_tuple!(race2 Race2 RaceOutputs2 A B); 129 + impl_race_tuple!(race3 Race3 RaceOutputs3 A B C); 130 + impl_race_tuple!(race4 Race4 RaceOutputs4 A B C D); 131 + impl_race_tuple!(race5 Race5 RaceOutputs5 A B C D E); 132 + impl_race_tuple!(race6 Race6 RaceOutputs6 A B C D E F); 133 + impl_race_tuple!(race7 Race7 RaceOutputs7 A B C D E F G); 134 + impl_race_tuple!(race8 Race8 RaceOutputs8 A B C D E F G H); 135 + impl_race_tuple!(race9 Race9 RaceOutputs9 A B C D E F G H I); 136 + impl_race_tuple!(race10 Race10 RaceOutputs10 A B C D E F G H I J); 137 + impl_race_tuple!(race11 Race11 RaceOutputs11 A B C D E F G H I J K); 138 + impl_race_tuple!(race12 Race12 RaceOutputs12 A B C D E F G H I J K L); 139 + 140 + #[cfg(test)] 141 + mod tests { 142 + #![no_std] 143 + 144 + use std::pin; 145 + 146 + use futures_core::Future; 147 + use futures_util::{dummy_guard, poll_fn}; 148 + 149 + use crate::wake::local_wake; 150 + 151 + use super::*; 152 + 153 + #[test] 154 + fn counters() { 155 + let mut x1 = 0; 156 + let mut x2 = 0; 157 + let f1 = poll_fn(|waker| { 158 + local_wake(waker); 159 + x1 += 1; 160 + if x1 == 4 { 161 + Poll::Ready(x1) 162 + } else { 163 + Poll::Pending 164 + } 165 + }); 166 + let f2 = poll_fn(|waker| { 167 + local_wake(waker); 168 + x2 += 1; 169 + if x2 == 2 { 170 + Poll::Ready(x2) 171 + } else { 172 + Poll::Pending 173 + } 174 + }); 175 + let guard = pin::pin!(dummy_guard()); 176 + let mut race = pin::pin!((f1, f2).race()); 177 + assert_eq!(race.as_mut().poll(guard.as_ref()), Poll::Pending); 178 + assert_eq!(race.poll(guard.as_ref()), Poll::Ready(RaceOutputs2::B(2))); 179 + } 180 + 181 + #[test] 182 + fn never_wake() { 183 + let f1 = poll_fn(|_| Poll::<i32>::Pending); 184 + let f2 = poll_fn(|_| Poll::<i32>::Pending); 185 + let mut race = pin::pin!((f1, f2).race()); 186 + let guard = pin::pin!(dummy_guard()); 187 + for _ in 0..10 { 188 + assert_eq!(race.as_mut().poll(guard.as_ref()), Poll::Pending); 189 + } 190 + } 191 + 192 + #[test] 193 + fn basic() { 194 + let f1 = poll_fn(|_| Poll::Ready(1)); 195 + let f2 = poll_fn(|_| Poll::Ready(2)); 196 + let race = pin::pin!(f1.race_with(f2)); 197 + let guard = pin::pin!(dummy_guard()); 198 + assert_eq!(race.poll(guard.as_ref()), Poll::Ready(RaceOutputs2::A(1))); 199 + } 200 + }
+114
futures-combinators/src/wake.rs
··· 1 + use std::{array, cell::Cell, marker::PhantomPinned, pin::Pin, ptr::NonNull}; 2 + 3 + use futures_compat::{LocalWaker, WakePtr}; 4 + use futures_core::Wake; 5 + use lifetime_guard::{guard::RefGuard, guard::ValueGuard}; 6 + 7 + pub struct WakeArray<const N: usize> { 8 + parent: RefGuard<WakePtr>, 9 + children: [ValueGuard<WakePtr>; N], 10 + stores: [WakeStore; N], 11 + _marker: PhantomPinned, 12 + } 13 + 14 + impl<const N: usize> WakeArray<N> { 15 + pub fn new() -> Self { 16 + Self { 17 + parent: RefGuard::new(), 18 + children: array::from_fn(|_| ValueGuard::new(None)), 19 + stores: array::from_fn(|_| WakeStore::new()), 20 + _marker: PhantomPinned, 21 + } 22 + } 23 + 24 + pub fn register_parent( 25 + self: Pin<&Self>, 26 + parent: Pin<&ValueGuard<WakePtr>>, 27 + ) { 28 + unsafe { Pin::new_unchecked(&self.parent) }.register(parent); 29 + } 30 + 31 + /// Returns pinned reference to child ValueGuard 32 + /// returns None if n is not in 0..N 33 + pub fn child_guard_ptr( 34 + self: Pin<&Self>, 35 + index: usize, 36 + ) -> Option<Pin<&ValueGuard<WakePtr>>> { 37 + // TODO remove bounds checking, break api when https://github.com/rust-lang/rust/issues/123646 38 + if index >= N { 39 + return None; 40 + } 41 + 42 + let wake_store = unsafe { self.stores.get(index).unwrap_unchecked() }; 43 + wake_store.set_parent(&self.parent); 44 + 45 + let wake_store = unsafe { 46 + NonNull::new_unchecked( 47 + wake_store as *const dyn Wake as *mut dyn Wake, 48 + ) 49 + }; 50 + 51 + let child_guard = 52 + unsafe { self.get_ref().children.get(index).unwrap_unchecked() }; 53 + child_guard.set(Some(wake_store)); 54 + 55 + Some(unsafe { Pin::new_unchecked(child_guard) }) 56 + } 57 + 58 + pub fn take_woken(self: Pin<&Self>, index: usize) -> Option<bool> { 59 + self.stores.get(index).map(|store| store.take_woken()) 60 + } 61 + } 62 + 63 + pub struct WakeStore { 64 + wake_parent: Cell<Option<NonNull<RefGuard<WakePtr>>>>, 65 + activated: Cell<bool>, 66 + } 67 + 68 + impl WakeStore { 69 + pub fn new() -> Self { 70 + Self { 71 + wake_parent: Cell::new(None), 72 + activated: Cell::new(true), 73 + } 74 + } 75 + 76 + pub fn set_parent(&self, parent: &RefGuard<WakePtr>) { 77 + self.wake_parent.set(Some(parent.into())); 78 + } 79 + 80 + pub fn take_woken(&self) -> bool { 81 + self.activated.replace(false) 82 + } 83 + } 84 + 85 + impl Wake for WakeStore { 86 + fn wake(&self) { 87 + dbg!("awake?"); 88 + self.activated.set(true); 89 + if let Some(parent) = self 90 + .wake_parent 91 + .get() 92 + .map(|guard_ptr| unsafe { &*guard_ptr.as_ptr() }) 93 + .and_then(|guard| guard.get()) 94 + .flatten() 95 + { 96 + unsafe { &*parent.as_ptr() }.wake(); 97 + } 98 + } 99 + } 100 + 101 + pub fn local_wake(guard: &LocalWaker) { 102 + if let Some(wake) = guard.get() { 103 + unsafe { (*wake.as_ptr()).wake() } 104 + } 105 + } 106 + 107 + // pub unsafe fn wake_bespoke_waker(waker: &std::task::Waker) { 108 + // unsafe { 109 + // let guard = futures_compat::waker_to_guard(waker); 110 + // if let Some(wake) = guard.get() { 111 + // (*wake.as_ptr()).wake(); 112 + // } 113 + // } 114 + // }
+13
futures-compat/Cargo.toml
··· 1 + [package] 2 + name = "futures-compat" 3 + version.workspace = true 4 + rust-version.workspace = true 5 + edition.workspace = true 6 + license.workspace = true 7 + authors.workspace = true 8 + repository.workspace = true 9 + homepage.workspace = true 10 + 11 + [dependencies] 12 + futures-core = { workspace = true } 13 + lifetime-guard = { workspace = true }
+145
futures-compat/src/lib.rs
··· 1 + //! Any interaction between an executor/reactor intended for task::Future 2 + //! with an executor/reactor intended for bcsc::Future is strictly unsound. 3 + 4 + use std::{ 5 + hint::unreachable_unchecked, 6 + mem::ManuallyDrop, 7 + pin::Pin, 8 + ptr::NonNull, 9 + task::{Context, Poll, RawWaker, RawWakerVTable, Waker}, 10 + }; 11 + 12 + use futures_core::Wake; 13 + use lifetime_guard::{atomic_guard::AtomicValueGuard, guard::ValueGuard}; 14 + 15 + pub type WakePtr = Option<NonNull<dyn Wake>>; 16 + pub type LocalWaker = ValueGuard<WakePtr>; 17 + pub type AtomicWaker = AtomicValueGuard<WakePtr>; 18 + 19 + static EVIL_VTABLE: RawWakerVTable = unsafe { 20 + RawWakerVTable::new( 21 + |_| unreachable_unchecked(), 22 + |_| unreachable_unchecked(), 23 + |_| unreachable_unchecked(), 24 + |_| unreachable_unchecked(), 25 + ) 26 + }; 27 + 28 + /// Coerces a pinned `ValueGuard` reference to a `Waker` for use in 29 + /// `core::future::Future` 30 + /// 31 + /// Any usage or storage of the resulting `Waker` is undefined behavior. 32 + pub unsafe fn guard_to_waker(guard: Pin<&LocalWaker>) -> ManuallyDrop<Waker> { 33 + ManuallyDrop::new(unsafe { 34 + Waker::from_raw(RawWaker::new( 35 + guard.get_ref() as *const ValueGuard<WakePtr> as *const (), 36 + &EVIL_VTABLE, 37 + )) 38 + }) 39 + } 40 + 41 + pub unsafe fn atomic_guard_to_waker( 42 + guard: Pin<&AtomicWaker>, 43 + ) -> ManuallyDrop<Waker> { 44 + ManuallyDrop::new(unsafe { 45 + Waker::from_raw(RawWaker::new( 46 + guard.get_ref() as *const AtomicValueGuard<WakePtr> as *const (), 47 + &EVIL_VTABLE, 48 + )) 49 + }) 50 + } 51 + 52 + /// Coerces a `Waker` into a pinned `AtomicValueGuard` reference. 53 + /// 54 + /// This should only be used to undo the work of `guard_to_waker`. 55 + pub unsafe fn waker_to_guard<'a>(waker: &Waker) -> Pin<&LocalWaker> { 56 + unsafe { 57 + Pin::new_unchecked(&*(waker.data() as *const ValueGuard<WakePtr>)) 58 + } 59 + } 60 + 61 + pub unsafe fn waker_to_atomic_guard<'a>(waker: &Waker) -> Pin<&AtomicWaker> { 62 + unsafe { 63 + Pin::new_unchecked(&*(waker.data() as *const AtomicValueGuard<WakePtr>)) 64 + } 65 + } 66 + 67 + pub unsafe fn std_future_to_bespoke<F: core::future::Future>( 68 + future: F, 69 + ) -> impl futures_core::Future<LocalWaker, Output = F::Output> { 70 + NormalFutureWrapper(future) 71 + } 72 + 73 + pub unsafe fn bespoke_future_to_std<F: futures_core::Future<LocalWaker>>( 74 + future: F, 75 + ) -> impl core::future::Future<Output = F::Output> { 76 + BespokeFutureWrapper(future) 77 + } 78 + 79 + /// wraps `core::future::Future` in impl of `bcsc:Future` 80 + #[repr(transparent)] 81 + pub struct NormalFutureWrapper<F: core::future::Future>(F); 82 + 83 + impl<F: core::future::Future> futures_core::Future<LocalWaker> 84 + for NormalFutureWrapper<F> 85 + { 86 + type Output = F::Output; 87 + 88 + fn poll( 89 + self: Pin<&mut Self>, 90 + waker: Pin<&LocalWaker>, 91 + ) -> Poll<Self::Output> { 92 + unsafe { 93 + self.map_unchecked_mut(|this| &mut this.0) 94 + .poll(&mut Context::from_waker(&guard_to_waker(waker))) 95 + } 96 + } 97 + } 98 + 99 + /// wraps custom `bcsc::Future` in impl of `core::future::Future` 100 + #[repr(transparent)] 101 + pub struct BespokeFutureWrapper<F>(F) 102 + where 103 + F: futures_core::Future<LocalWaker>; 104 + 105 + impl<F> core::future::Future for BespokeFutureWrapper<F> 106 + where 107 + F: futures_core::Future<LocalWaker>, 108 + { 109 + type Output = F::Output; 110 + 111 + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { 112 + unsafe { 113 + self.map_unchecked_mut(|this| &mut this.0) 114 + .poll(waker_to_guard(cx.waker())) 115 + } 116 + } 117 + } 118 + 119 + #[cfg(test)] 120 + mod test { 121 + use std::pin; 122 + 123 + use super::*; 124 + use futures_core::Wake; 125 + 126 + #[derive(Debug)] 127 + struct DummyWake; 128 + impl Wake for DummyWake { 129 + fn wake(&self) {} 130 + } 131 + 132 + #[test] 133 + fn waker_conversion() { 134 + let dummy = DummyWake; 135 + let guard = pin::pin!(ValueGuard::new(NonNull::new( 136 + &dummy as *const dyn Wake as *mut dyn Wake 137 + ))); 138 + let waker = unsafe { guard_to_waker(guard.as_ref()) }; 139 + let guard = unsafe { waker_to_guard(&waker) }; 140 + assert_eq!( 141 + guard.get().unwrap().as_ptr() as *const () as usize, 142 + &dummy as *const _ as *const () as usize 143 + ); 144 + } 145 + }
+149 -39
futures-core/src/lib.rs
··· 1 - use std::task::Poll; 1 + //! Redefinitions of task::Future to be incompatible with them 2 2 3 - /// A task that can be woken. 4 - /// 5 - /// This acts as a handle for a reactor to indicate when a `ScopedFuture` is 6 - /// once again ready to be polled. 7 - pub trait Wake<'scope> { 8 - fn wake(&self); 9 - } 3 + use std::{ 4 + ops::{self, DerefMut}, 5 + pin::Pin, 6 + task::Poll, 7 + }; 10 8 11 - /// ScopedFuture represents a unit of asynchronous computation that must be 12 - /// polled by an external actor. 9 + /// A future represents an asynchronous computation obtained by use of `async`. 13 10 /// 14 - /// Implementations access a context (`cx: &'scope mut dyn Wake`) to signal 15 - /// they are ready to resume execution. 11 + /// This future assumes a nonstandard Context, which is incompatible with 12 + /// executors or reactors made for `core::future::Future`. In the interest of 13 + /// safety, it has a dedicated type. 16 14 /// 17 - /// A notable difference between `bcsc::ScopedFuture` and `core::task::Future` 18 - /// is the latter cannot safetly ran as a task by an executor without having a 19 - /// 'static lifetime. This is because there is no way for the compiler to 20 - /// guarantee the task doesn't outlive any data, as the executor is free to 21 - /// cancel it (or refuse to) whenever it wants. 15 + /// A future is a value that might not have finished computing yet. This kind of 16 + /// "asynchronous value" makes it possible for a thread to continue doing useful 17 + /// work while it waits for the value to become available. 22 18 /// 23 - /// Additionally, because raw/unsafe implementations of `core::task::Waker` 24 - /// effectively do lifetime-erasure, stack-allocated futures cannot prevent 25 - /// unsound behavior from wakers outliving them (even `Forget` would not 26 - /// entirely fix this due to the api). 19 + /// # The `poll` method 27 20 /// 28 - /// In order to avoid unsound behavior, executors must either use Weak<Wake> 29 - /// for safetly losing access to tasks or enforce tasks being stored in 30 - /// `static` pools of memory. 31 - /// 32 - /// `ScopedFuture` instead leverages the borrow checker to allow for (less 33 - /// powerful) stack based async execution. 21 + /// The core method of future, `poll`, *attempts* to resolve the future into a 22 + /// final value. This method does not block if the value is not ready. Instead, 23 + /// the current task is scheduled to be woken up when it's possible to make 24 + /// further progress by `poll`ing again. The `context` passed to the `poll` 25 + /// method can provide a [`Waker`], which is a handle for waking up the current 26 + /// task. 34 27 /// 35 - /// some more: 36 - /// what occurs in `core::task::Future::poll()` is that the ref to a cx.waker 37 - /// is cloned and stored by a reactor via some method. 28 + /// When using a future, you generally won't call `poll` directly, but instead 29 + /// `.await` the value. 38 30 /// 39 - /// The waker is no longer tied to the actual future's lifetime, making it 40 - /// unsound to not have either static tasks or reference counting. 41 - /// To avoid this, we want to use a &'scope waker instead, with 1 waker / task. 42 - pub trait ScopedFuture<'scope> { 31 + /// [`Waker`]: crate::task::Waker 32 + #[must_use = "futures do nothing unless you `.await` or poll them"] 33 + #[diagnostic::on_unimplemented( 34 + label = "`{Self}` is not a `bcsc::Future`", 35 + message = "`{Self}` is not a `bcsc::Future`", 36 + note = "If you are trying to await a `core::future::Future` from within a `bcsc::Future`, note that the systems are incompatible." 37 + )] 38 + pub trait Future<Waker> { 39 + /// The type of value produced on completion. 43 40 type Output; 44 41 45 - /// as soon as poll is called, the struct becomes self-referential, 46 - /// effectively pinned until dropped (or forgotten....D; ) 42 + /// Attempts to resolve the future to a final value, registering 43 + /// the current task for wakeup if the value is not yet available. 44 + /// 45 + /// # Return value 46 + /// 47 + /// This function returns: 48 + /// 49 + /// - [`Poll::Pending`] if the future is not ready yet 50 + /// - [`Poll::Ready(val)`] with the result `val` of this future if it 51 + /// finished successfully. 52 + /// 53 + /// Once a future has finished, clients should not `poll` it again. 54 + /// 55 + /// When a future is not ready yet, `poll` returns `Poll::Pending` and 56 + /// stores a clone of the [`Waker`] copied from the current [`Context`]. 57 + /// This [`Waker`] is then woken once the future can make progress. 58 + /// For example, a future waiting for a socket to become 59 + /// readable would call `.clone()` on the [`Waker`] and store it. 60 + /// When a signal arrives elsewhere indicating that the socket is readable, 61 + /// [`Waker::wake`] is called and the socket future's task is awoken. 62 + /// Once a task has been woken up, it should attempt to `poll` the future 63 + /// again, which may or may not produce a final value. 64 + /// 65 + /// Note that on multiple calls to `poll`, only the [`Waker`] from the 66 + /// [`Context`] passed to the most recent call should be scheduled to 67 + /// receive a wakeup. 68 + /// 69 + /// # Runtime characteristics 70 + /// 71 + /// Futures alone are *inert*; they must be *actively* `poll`ed to make 72 + /// progress, meaning that each time the current task is woken up, it should 73 + /// actively re-`poll` pending futures that it still has an interest in. 74 + /// 75 + /// The `poll` function is not called repeatedly in a tight loop -- instead, 76 + /// it should only be called when the future indicates that it is ready to 77 + /// make progress (by calling `wake()`). If you're familiar with the 78 + /// `poll(2)` or `select(2)` syscalls on Unix it's worth noting that futures 79 + /// typically do *not* suffer the same problems of "all wakeups must poll 80 + /// all events"; they are more like `epoll(4)`. 81 + /// 82 + /// An implementation of `poll` should strive to return quickly, and should 83 + /// not block. Returning quickly prevents unnecessarily clogging up 84 + /// threads or event loops. If it is known ahead of time that a call to 85 + /// `poll` may end up taking a while, the work should be offloaded to a 86 + /// thread pool (or something similar) to ensure that `poll` can return 87 + /// quickly. 88 + /// 89 + /// # Panics 90 + /// 91 + /// Once a future has completed (returned `Ready` from `poll`), calling its 92 + /// `poll` method again may panic, block forever, or cause other kinds of 93 + /// problems; the `Future` trait places no requirements on the effects of 94 + /// such a call. However, as the `poll` method is not marked `unsafe`, 95 + /// Rust's usual rules apply: calls must never cause undefined behavior 96 + /// (memory corruption, incorrect use of `unsafe` functions, or the like), 97 + /// regardless of the future's state. 98 + /// 99 + /// [`Poll::Ready(val)`]: Poll::Ready 100 + /// [`Waker`]: crate::task::Waker 101 + /// [`Waker::wake`]: crate::task::Waker::wake 102 + fn poll(self: Pin<&mut Self>, waker: Pin<&Waker>) -> Poll<Self::Output>; 103 + } 104 + 105 + impl<Waker, F: ?Sized + Future<Waker> + Unpin> Future<Waker> for &mut F { 106 + type Output = F::Output; 107 + 47 108 fn poll( 48 - self: &'scope Self, 49 - wake: &'scope dyn Wake<'scope>, 50 - ) -> Poll<Self::Output>; 109 + mut self: Pin<&mut Self>, 110 + waker: Pin<&Waker>, 111 + ) -> Poll<Self::Output> { 112 + F::poll(Pin::new(&mut **self), waker) 113 + } 114 + } 115 + 116 + impl<Waker, P> Future<Waker> for Pin<P> 117 + where 118 + P: ops::DerefMut<Target: Future<Waker>>, 119 + { 120 + type Output = <<P as ops::Deref>::Target as Future<Waker>>::Output; 121 + 122 + fn poll(self: Pin<&mut Self>, waker: Pin<&Waker>) -> Poll<Self::Output> { 123 + <P::Target as Future<Waker>>::poll(self.as_deref_mut(), waker) 124 + } 125 + } 126 + 127 + /// A future which tracks whether or not the underlying future 128 + /// should no longer be polled. 129 + /// 130 + /// `is_terminated` will return `true` if a future should no longer be polled. 131 + /// Usually, this state occurs after `poll` (or `try_poll`) returned 132 + /// `Poll::Ready`. However, `is_terminated` may also return `true` if a future 133 + /// has become inactive and can no longer make progress and should be ignored 134 + /// or dropped rather than being `poll`ed again. 135 + pub trait FusedFuture<Waker>: Future<Waker> { 136 + /// Returns `true` if the underlying future should no longer be polled. 137 + fn is_terminated(&self) -> bool; 138 + } 139 + 140 + impl<Waker, F: FusedFuture<Waker> + ?Sized + Unpin> FusedFuture<Waker> 141 + for &mut F 142 + { 143 + fn is_terminated(&self) -> bool { 144 + <F as FusedFuture<Waker>>::is_terminated(&**self) 145 + } 146 + } 147 + 148 + impl<Waker, P> FusedFuture<Waker> for Pin<P> 149 + where 150 + P: DerefMut + Unpin, 151 + P::Target: FusedFuture<Waker>, 152 + { 153 + fn is_terminated(&self) -> bool { 154 + <P::Target as FusedFuture<Waker>>::is_terminated(&**self) 155 + } 156 + } 157 + 158 + /// temporary trait until Fn::call is stabilized 159 + pub trait Wake { 160 + fn wake(&self); 51 161 }
+17
futures-derive/Cargo.toml
··· 1 + [lib] 2 + proc-macro = true 3 + 4 + [package] 5 + name = "futures-derive" 6 + version.workspace = true 7 + rust-version.workspace = true 8 + edition.workspace = true 9 + license.workspace = true 10 + authors.workspace = true 11 + repository.workspace = true 12 + homepage.workspace = true 13 + 14 + [dependencies] 15 + proc-macro2 = "1.0" 16 + quote = "1.0" 17 + syn = { version = "2.0", features = ["full", "visit-mut" ] }
+212
futures-derive/src/lib.rs
··· 1 + use proc_macro::TokenStream; 2 + use quote::{ToTokens, quote}; 3 + use syn::{ 4 + Expr, ExprAwait, ItemFn, ReturnType, parse_macro_input, parse_quote, 5 + parse2, visit_mut::VisitMut, 6 + }; 7 + 8 + /// Takes async fn that returns anonymous `Future` impl. 9 + /// Generates fn that returns `UnscopedFutureWrapper` wrapper for the the anonymous `Future` impl. 10 + /// 11 + /// ```rust,ignore 12 + /// fn my_func<'a, 'b>(a: &'a A, b: &'b B) -> impl ScopedFuture<LifetimeGuard, Output = Output> { 13 + /// let output = async move { [body] } // compilers turns this into -> impl Future<Output = T> + 'a + 'b 14 + /// unsafe { futures_compat::std_future_to_bespoke(output) } 15 + /// } 16 + /// ``` 17 + /// 18 + /// see https://rust-lang.github.io/rfcs/2394-async_await.html#lifetime-capture-in-the-anonymous-future 19 + /// for more context on lifetime capture 20 + /// - resulting ScopedFuture needs to be constrained to not outlive the lifetimes of any references 21 + /// 22 + /// to actually implement this (capture all lifetimes) we use `ScopedFuture<'_> + '_` so the compiler can infer 23 + /// lifetimes from the anonymous future impl returned by the actual inner async fn 24 + #[proc_macro_attribute] 25 + pub fn async_function(_: TokenStream, item: TokenStream) -> TokenStream { 26 + let mut item_fn = parse_macro_input!(item as ItemFn); 27 + // Wraps *every* async expression within the function block with 28 + // `BespokeFutureWrapper`, allowing them to be treated as regular `Future` 29 + // impls. 30 + // 31 + // This will cause a compiler error if any expression being awaited is not 32 + // a `ScopedFuture`, which is intentional because the `Future` and 33 + // `ScopedFuture` systems are incompatible. 34 + BespokeFutureWrappingVisitor.visit_item_fn_mut(&mut item_fn); 35 + 36 + // disable async since it is moved to the block 37 + item_fn.sig.asyncness = None; 38 + 39 + // wrap block with UnscopedFutureWrapper 40 + let block = *item_fn.block; 41 + *item_fn.block = parse_quote! { 42 + { 43 + let future = async move #block; 44 + unsafe { futures_compat::std_future_to_bespoke(future) } 45 + } 46 + }; 47 + 48 + let output_type = match &item_fn.sig.output { 49 + ReturnType::Default => quote! { () }, 50 + ReturnType::Type(_, ty) => quote! { #ty }, 51 + }; 52 + 53 + item_fn.sig.output = parse_quote! { -> impl futures_core::Future<LocalWaker, Output = #output_type> }; 54 + 55 + // let has_lifetime_dependency = 56 + // item_fn.sig.inputs.iter().any(|param| match param { 57 + // FnArg::Receiver(receiver) => receiver.reference.is_some(), 58 + // FnArg::Typed(pat) => has_lifetime_dependency(&pat.ty), 59 + // }); 60 + 61 + // // set outer fn output to ScopedFuture<'_/'static, Output = #output> 62 + // item_fn.sig.output = if has_lifetime_dependency { 63 + // parse_quote! { -> impl futures_core::ScopedFuture<'_, Output = #output> + '_ } 64 + // } else { 65 + // parse_quote! { -> impl futures_core::ScopedFuture<'static, Output = #output> } 66 + // }; 67 + 68 + item_fn.to_token_stream().into() 69 + } 70 + 71 + /// This currently is impossible to do the `futures_compat` workarounds not 72 + /// being compatible with closures. 73 + /// 74 + /// Takes async fn that returns anonymous `Future` impl. 75 + /// Generates fn that returns `UnscopedFutureWrapper` wrapper for the the anonymous `Future` impl. 76 + /// 77 + /// ```rust,ignore 78 + /// fn [original name]<'a, 'b>(a: &'a A, b: &'b B) -> impl ScopedFuture<'a + 'b, Output = T> + 'a + 'b { 79 + /// async fn [__inner]<'a, 'b>(a: &'a A, b: &'b B) -> T { [body] } // compilers turns this into -> impl Future<Output = T> + 'a + 'b 80 + /// unsafe { UnscopedFutureWrapper::from_future(__inner()) } 81 + /// } 82 + /// ``` 83 + /// 84 + /// see https://rust-lang.github.io/rfcs/2394-async_await.html#lifetime-capture-in-the-anonymous-future 85 + /// for more context on lifetime capture 86 + /// - resulting ScopedFuture needs to be constrained to not outlive the lifetimes of any references 87 + /// 88 + /// to actually implement this (capture all lifetimes) we use `ScopedFuture<'_> + '_` so the compiler can infer 89 + /// lifetimes from the anonymous future impl returned by the actual inner async fn 90 + // #[proc_macro] 91 + // pub fn closure(input: TokenStream) -> TokenStream { 92 + // // let ExprClosure { 93 + // // attrs, 94 + // // lifetimes, 95 + // // constness, 96 + // // movability, 97 + // // capture, 98 + // // inputs, 99 + // // output, 100 + // // body, 101 + // // .. 102 + // // } = parse_macro_input!(input as ExprClosure); 103 + // let mut closure = parse_macro_input!(input as ExprClosure); 104 + // // disable async because we move it to inner 105 + // closure.asyncness = None; 106 + // let body = closure.body; 107 + 108 + // // let output = match closure.output { 109 + // // ReturnType::Default => parse_quote! { () }, 110 + // // ReturnType::Type(_, ty) => parse_quote! { #ty }, 111 + // // }; 112 + 113 + // // let outer_output = 114 + // // parse_quote! { futures_core::ScopedFuture<'_, Output = #output> + '_ }; 115 + 116 + // closure.body = parse_quote! {{ 117 + // let output = async move { #body }; 118 + // unsafe { futures_compat::UnscopedFutureWrapper::from_future(output) } 119 + // }}; 120 + // // closure.output = outer_output; 121 + // closure.to_token_stream().into() 122 + // } 123 + 124 + /// Wraps a block of optionally async statements and expressions in an anonymous `ScopedFuture` impl. 125 + /// 126 + /// This generates a modified block of the form: 127 + /// 128 + /// ```rust,ignore 129 + /// { 130 + /// let output = async { <original block, mapped to convert all `ScopedFuture` to `Future`> }; 131 + /// unsafe { futures_compat::UnscopedFutureWrapper::from_future(output) } 132 + /// } 133 + /// ``` 134 + #[proc_macro] 135 + pub fn async_block(input: TokenStream) -> TokenStream { 136 + // block is formed { **expr/stmt }, so we need to surround the inputs in {} 137 + let input = proc_macro2::TokenStream::from(input); 138 + let block_input = quote! { { #input } }; 139 + 140 + let mut block = parse2(block_input).expect("Failed to parse as block."); 141 + 142 + BespokeFutureWrappingVisitor.visit_block_mut(&mut block); 143 + 144 + quote! { 145 + { 146 + let output = async #block; 147 + unsafe { futures_compat::std_future_to_bespoke(output) } 148 + } 149 + } 150 + .into() 151 + } 152 + 153 + /// Determines if typed pattern contains a reference or dependency on a 154 + /// lifetime (used for deciding between '_ and 'static ScopedFuture). 155 + // fn has_lifetime_dependency(ty: &syn::Type) -> bool { 156 + // match ty { 157 + // syn::Type::Reference(_) => true, 158 + // syn::Type::Path(type_path) => { 159 + // type_path.path.segments.iter().any(|segment| { 160 + // if let syn::PathArguments::AngleBracketed(args) = 161 + // &segment.arguments 162 + // { 163 + // args.args.iter().any(|arg| match arg { 164 + // GenericArgument::Type(ty) => { 165 + // has_lifetime_dependency(ty) 166 + // } 167 + // syn::GenericArgument::Lifetime(_) => true, 168 + // _ => false, 169 + // }) 170 + // } else { 171 + // false 172 + // } 173 + // }) 174 + // } 175 + // syn::Type::Tuple(tuple) => { 176 + // tuple.elems.iter().any(has_lifetime_dependency) 177 + // } 178 + // syn::Type::Slice(slice) => has_lifetime_dependency(&slice.elem), 179 + // syn::Type::Array(array) => has_lifetime_dependency(&array.elem), 180 + // syn::Type::Ptr(ptr) => has_lifetime_dependency(&ptr.elem), 181 + // syn::Type::Group(group) => has_lifetime_dependency(&group.elem), 182 + // syn::Type::Paren(paren) => has_lifetime_dependency(&paren.elem), 183 + // syn::Type::BareFn(bare_fn) => { 184 + // bare_fn 185 + // .inputs 186 + // .iter() 187 + // .any(|input| has_lifetime_dependency(&input.ty)) 188 + // || match &bare_fn.output { 189 + // ReturnType::Default => false, 190 + // ReturnType::Type(_, ty) => has_lifetime_dependency(ty), 191 + // } 192 + // } 193 + 194 + // _ => false, 195 + // } 196 + // } 197 + 198 + /// Uses the `syn::visit_mut` api to wrap every `.await` expression in 199 + /// `ScopedFutureWrapper`. 200 + struct BespokeFutureWrappingVisitor; 201 + 202 + impl VisitMut for BespokeFutureWrappingVisitor { 203 + fn visit_expr_mut(&mut self, expr: &mut syn::Expr) { 204 + if let Expr::Await(ExprAwait { attrs, base, .. }) = expr { 205 + *expr = syn::parse_quote! { 206 + unsafe { futures_compat::bespoke_future_to_std(#(#attrs)* #base) }.await 207 + }; 208 + } 209 + 210 + syn::visit_mut::visit_expr_mut(self, expr); 211 + } 212 + }
+2 -1
futures-util/Cargo.toml
··· 9 9 homepage.workspace = true 10 10 11 11 [dependencies] 12 - futures-core = { path = "../futures-core" } 12 + futures-core = { workspace = true } 13 + lifetime-guard = { workspace = true }
+17
futures-util/src/block_on.rs
··· 1 + use std::{ 2 + pin::{self, Pin}, 3 + task::Poll, 4 + }; 5 + 6 + use crate::{LocalWaker, dummy_guard}; 7 + 8 + pub fn block_on<F: futures_core::Future<LocalWaker>>( 9 + mut f: Pin<&mut F>, 10 + ) -> F::Output { 11 + let dummy_guard = pin::pin!(dummy_guard()); 12 + loop { 13 + if let Poll::Ready(out) = f.as_mut().poll(dummy_guard.as_ref()) { 14 + return out; 15 + } 16 + } 17 + }
+49 -9
futures-util/src/lib.rs
··· 1 - mod maybe_done; 2 - mod poll_fn; 1 + use std::{pin::Pin, ptr::NonNull, task::Poll}; 2 + 3 + use futures_core::{Future, Wake}; 4 + use lifetime_guard::{atomic_guard::AtomicValueGuard, guard::ValueGuard}; 5 + 6 + pub mod block_on; 7 + pub mod maybe_done; 3 8 4 - use futures_core::ScopedFuture; 5 - pub use maybe_done::*; 6 - pub use poll_fn::poll_fn; 9 + pub type WakePtr = Option<NonNull<dyn Wake>>; 10 + pub type LocalWaker = ValueGuard<WakePtr>; 11 + pub type AtomicWaker = AtomicValueGuard<WakePtr>; 7 12 8 - // Just a helper function to ensure the futures we're returning all have the 9 - // right implementations. 10 - pub(crate) fn assert_future<'scope, T, F>(future: F) -> F 13 + pub(crate) fn assert_future<T, F>(future: F) -> F 11 14 where 12 - F: ScopedFuture<'scope, Output = T>, 15 + F: Future<LocalWaker, Output = T>, 13 16 { 14 17 future 15 18 } 19 + 20 + pub struct PollFn<F, T>(F) 21 + where 22 + F: FnMut(&LocalWaker) -> Poll<T>; 23 + 24 + impl<F, T> futures_core::Future<LocalWaker> for PollFn<F, T> 25 + where 26 + F: FnMut(&LocalWaker) -> Poll<T>, 27 + { 28 + type Output = T; 29 + 30 + fn poll( 31 + self: Pin<&mut Self>, 32 + waker: Pin<&LocalWaker>, 33 + ) -> Poll<Self::Output> { 34 + (unsafe { &mut self.get_unchecked_mut().0 })(&waker) 35 + } 36 + } 37 + 38 + pub fn poll_fn<F, T>(f: F) -> impl futures_core::Future<LocalWaker, Output = T> 39 + where 40 + F: FnMut(&LocalWaker) -> Poll<T>, 41 + { 42 + PollFn(f) 43 + } 44 + 45 + pub struct DummyWaker; 46 + 47 + impl Wake for DummyWaker { 48 + fn wake(&self) { 49 + dbg!("awake!"); 50 + } 51 + } 52 + 53 + pub fn dummy_guard() -> ValueGuard<WakePtr> { 54 + ValueGuard::new(NonNull::new(&mut DummyWaker as *mut dyn Wake)) 55 + }
+60 -60
futures-util/src/maybe_done.rs
··· 1 1 //! Definition of the MaybeDone combinator 2 2 3 - use futures_core::{ScopedFuture, Wake}; 3 + use futures_core::FusedFuture; 4 + 5 + use crate::LocalWaker; 4 6 5 7 use super::assert_future; 6 - use std::{ 7 - cell::UnsafeCell, 8 - task::{Poll, ready}, 9 - }; 10 - 11 - #[must_use = "futures do nothing unless you `.await` or poll them"] 12 - pub struct MaybeDone<'scope, Fut: ScopedFuture<'scope>> { 13 - state: UnsafeCell<MaybeDoneState<'scope, Fut>>, 14 - } 8 + use core::mem; 9 + use core::pin::Pin; 10 + use futures_core::Future; 11 + use std::task::Poll; 12 + use std::task::ready; 15 13 16 14 /// A future that may have completed. 17 15 /// 18 16 /// This is created by the [`maybe_done()`] function. 19 17 #[derive(Debug)] 20 - pub enum MaybeDoneState<'scope, Fut: ScopedFuture<'scope>> { 18 + pub enum MaybeDone<Fut: futures_core::Future<LocalWaker>> { 21 19 /// A not-yet-completed future 22 - Future(Fut), 20 + Future(/* #[pin] */ Fut), 23 21 /// The output of the completed future 24 22 Done(Fut::Output), 25 23 /// The empty variant after the result of a [`MaybeDone`] has been ··· 27 25 Gone, 28 26 } 29 27 28 + impl<Fut: Future<LocalWaker> + Unpin> Unpin for MaybeDone<Fut> {} 29 + 30 30 /// Wraps a future into a `MaybeDone` 31 - /// 32 - /// # Examples 33 - /// 34 - /// ``` 35 - /// # futures::executor::block_on(async { 36 - /// use core::pin::pin; 37 - /// 38 - /// use futures::future; 39 - /// 40 - /// let future = future::maybe_done(async { 5 }); 41 - /// let mut future = pin!(future); 42 - /// assert_eq!(future.as_mut().take_output(), None); 43 - /// let () = future.as_mut().await; 44 - /// assert_eq!(future.as_mut().take_output(), Some(5)); 45 - /// assert_eq!(future.as_mut().take_output(), None); 46 - /// # }); 47 - /// ``` 48 - pub fn maybe_done<'scope, Fut: ScopedFuture<'scope>>( 31 + pub fn maybe_done<Fut: futures_core::Future<LocalWaker>>( 49 32 future: Fut, 50 - ) -> MaybeDone<'scope, Fut> { 51 - assert_future::<(), _>(MaybeDone { 52 - state: MaybeDoneState::Future(future).into(), 53 - }) 33 + ) -> MaybeDone<Fut> { 34 + assert_future::<(), _>(MaybeDone::Future(future)) 54 35 } 55 36 56 - impl<'scope, Fut: ScopedFuture<'scope>> MaybeDone<'scope, Fut> { 37 + impl<Fut: Future<LocalWaker>> MaybeDone<Fut> { 38 + /// Returns an [`Option`] containing a mutable reference to the output of the future. 39 + /// The output of this method will be [`Some`] if and only if the inner 40 + /// future has been completed and [`take_output`](MaybeDone::take_output) 41 + /// has not yet been called. 42 + #[inline] 43 + pub fn output_mut(self: Pin<&mut Self>) -> Option<&mut Fut::Output> { 44 + unsafe { 45 + match self.get_unchecked_mut() { 46 + Self::Done(res) => Some(res), 47 + _ => None, 48 + } 49 + } 50 + } 51 + 57 52 /// Attempt to take the output of a `MaybeDone` without driving it 58 53 /// towards completion. 59 54 #[inline] 60 - pub fn take_output(&self) -> Option<Fut::Output> { 61 - match unsafe { &*self.state.get() } { 62 - MaybeDoneState::Done(_) => {} 63 - MaybeDoneState::Future(_) | MaybeDoneState::Gone => return None, 55 + pub fn take_output(self: Pin<&mut Self>) -> Option<Fut::Output> { 56 + match &*self { 57 + Self::Done(_) => {} 58 + Self::Future(_) | Self::Gone => return None, 64 59 } 65 - match unsafe { self.state.get().replace(MaybeDoneState::Gone) } { 66 - MaybeDoneState::Done(output) => Some(output), 67 - _ => unreachable!(), 60 + unsafe { 61 + match mem::replace(self.get_unchecked_mut(), Self::Gone) { 62 + Self::Done(output) => Some(output), 63 + _ => unreachable!(), 64 + } 68 65 } 69 66 } 67 + } 70 68 71 - /// Returns an immutable reference to the internal state of this future 72 - /// 73 - /// # Safety 74 - /// You must not hold this reference past any use of any other methods of this struct 75 - pub unsafe fn get_state(&self) -> &MaybeDoneState<'scope, Fut> { 76 - unsafe { &*self.state.get() } 69 + impl<Fut: Future<LocalWaker>> FusedFuture<LocalWaker> for MaybeDone<Fut> { 70 + fn is_terminated(&self) -> bool { 71 + match self { 72 + Self::Future(_) => false, 73 + Self::Done(_) | Self::Gone => true, 74 + } 77 75 } 78 76 } 79 77 80 - impl<'scope, Fut: ScopedFuture<'scope>> ScopedFuture<'scope> 81 - for MaybeDone<'scope, Fut> 78 + impl<Fut: Future<LocalWaker>> futures_core::Future<LocalWaker> 79 + for MaybeDone<Fut> 82 80 { 83 81 type Output = (); 84 82 85 - fn poll(&'scope self, cx: &'scope dyn Wake<'scope>) -> Poll<Self::Output> { 86 - match unsafe { &*self.state.get() } { 87 - MaybeDoneState::Future(f) => { 88 - let res = ready!(f.poll(cx)); 89 - // this is fine because no immutable references currently exist 90 - unsafe { self.state.get().replace(MaybeDoneState::Done(res)) }; 91 - } 92 - MaybeDoneState::Done(_) => {} 93 - MaybeDoneState::Gone => { 94 - panic!("MaybeDone polled after value taken") 83 + fn poll( 84 + mut self: Pin<&mut Self>, 85 + waker: Pin<&LocalWaker>, 86 + ) -> Poll<Self::Output> { 87 + unsafe { 88 + match self.as_mut().get_unchecked_mut() { 89 + Self::Future(f) => { 90 + let res = ready!(Pin::new_unchecked(f).poll(waker)); 91 + self.set(Self::Done(res)); 92 + } 93 + Self::Done(_) => {} 94 + Self::Gone => panic!("MaybeDone polled after value taken"), 95 95 } 96 96 } 97 97 Poll::Ready(())
-55
futures-util/src/poll_fn.rs
··· 1 - use core::fmt; 2 - use std::task::Poll; 3 - 4 - use futures_core::{ScopedFuture, Wake}; 5 - 6 - use crate::assert_future; 7 - 8 - /// Future for the [`poll_fn`] function. 9 - #[must_use = "futures do nothing unless you `.await` or poll them"] 10 - pub struct PollFn<F> { 11 - f: F, 12 - } 13 - 14 - /// Creates a new future wrapping around a function returning [`Poll`]. 15 - /// 16 - /// Polling the returned future delegates to the wrapped function. 17 - /// 18 - /// # Examples 19 - /// 20 - /// ``` 21 - /// # futures::executor::block_on(async { 22 - /// use futures::future::poll_fn; 23 - /// use futures::task::{Context, Poll}; 24 - /// 25 - /// fn read_line(_cx: &mut Context<'_>) -> Poll<String> { 26 - /// Poll::Ready("Hello, World!".into()) 27 - /// } 28 - /// 29 - /// let read_future = poll_fn(read_line); 30 - /// assert_eq!(read_future.await, "Hello, World!".to_owned()); 31 - /// # }); 32 - /// ``` 33 - pub fn poll_fn<'scope, T, F>(f: F) -> PollFn<F> 34 - where 35 - F: Fn(&'scope dyn Wake) -> Poll<T>, 36 - { 37 - assert_future::<T, _>(PollFn { f }) 38 - } 39 - 40 - impl<F> fmt::Debug for PollFn<F> { 41 - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { 42 - f.debug_struct("PollFn").finish() 43 - } 44 - } 45 - 46 - impl<'scope, T, F> ScopedFuture<'scope> for PollFn<F> 47 - where 48 - F: Fn(&'scope dyn Wake) -> Poll<T>, 49 - { 50 - type Output = T; 51 - 52 - fn poll(&self, wake: &'scope dyn Wake) -> Poll<T> { 53 - (&self.f)(wake) 54 - } 55 - }
+22
lifetime-guard/Cargo.toml
··· 1 + [package] 2 + name = "lifetime-guard" 3 + version.workspace = true 4 + authors.workspace = true 5 + edition.workspace = true 6 + rust-version.workspace = true 7 + description = "create weak/strong reference pairs to interior mutable data on the stack" 8 + homepage.workspace = true 9 + repository.workspace = true 10 + license.workspace = true 11 + keywords = [ "weak_ptr", "value_guard", "no_std" ] 12 + categories = [ "data-structures", "no-std::no-alloc" ] 13 + 14 + [features] 15 + default = ["atomics"] 16 + atomics = ["dep:critical-section"] 17 + 18 + [target.'cfg(loom)'.dependencies] 19 + loom = "0.7" 20 + 21 + [dependencies] 22 + critical-section = { version = "1.1", features = ["std"], optional = true }
+50
lifetime-guard/README.md
··· 1 + # Lifetime Guard 2 + 3 + `lifetime-guard` provides `ValueGuard` and `RefGuard` structs to allow for 4 + weak references to interior mutable values, similar to a singular pair of 5 + `Rc` and `Weak`, but without heap allocation. 6 + 7 + For parallelism, it provides `AtomicValueGuard` and `AtomicRefGuard` that 8 + implement `Send`. 9 + 10 + ## Example Usage 11 + 12 + ```rust 13 + use std::pin; 14 + use lifetime_guard::guard::*; 15 + 16 + let weak = pin::pin!(RefGuard::new()); 17 + { 18 + let strong = pin::pin!(ValueGuard::new(0)); 19 + weak.as_ref().register(strong.as_ref()); 20 + 21 + assert_eq!(strong.get(), 0); 22 + assert_eq!(weak.get(), Some(0)); 23 + 24 + strong.as_ref().set(1); 25 + assert_eq!(strong.get(), 1); 26 + assert_eq!(weak.get(), Some(1)); 27 + } 28 + assert_eq!(weak.get(), None); 29 + ``` 30 + 31 + # Safety 32 + 33 + You *may not* leak any instance of either `ValueGuard` or `RefGuard` to the 34 + stack using `mem::forget()` or any other mechanism that causes their 35 + contents to be overwritten without `Drop::drop()` running. 36 + Doing so creates unsoundness that likely will lead to dereferencing a null 37 + pointer. 38 + 39 + Doing so creates unsoundness that likely will lead to dereferencing a null 40 + pointer. See the 41 + [Forget marker trait](https://github.com/rust-lang/rfcs/pull/3782) rfc for 42 + progress on making interfaces that rely on not being leaked sound. 43 + 44 + Note that it is sound to leak `ValueGuard` and `RefGuard` to the heap using 45 + methods including `Box::leak()` because heap allocated data will never be 46 + overwritten if it is never freed. 47 + 48 + The test cases for this library have been verified to not exhibit undefined 49 + behavior using [miri](https://github.com/rust-lang/miri). 50 +
+255
lifetime-guard/src/atomic_guard.rs
··· 1 + use core::{cell::Cell, marker::PhantomPinned, pin::Pin, ptr::NonNull}; 2 + 3 + use critical_section::Mutex; 4 + 5 + struct RawValueGuard<T> { 6 + /// Contains the value being immutably accessed by `RefGuard` and 7 + /// mutably accessed by `Self` 8 + /// 9 + /// This needs to be a cell so that the original immutable alias 10 + /// to `Self` (given to `RefGuard`) can continue to be referenced after 11 + /// invalidated by the creation of a mutable alias for `Self::set`. 12 + data: Cell<T>, 13 + /// A pointer to a `RefGuard` with read access to `data` to invalidate that 14 + /// `RefGuard` when `Self` is dropped. 15 + ref_guard: Cell<Option<NonNull<AtomicRefGuard<T>>>>, 16 + } 17 + 18 + /// Strong guard for granting read access to a single interior mutable value to 19 + /// [`RefGuard`](RefGuard). 20 + /// 21 + /// A `ValueGuard`:`RefGuard` relationship is exclusive, and behaves similarly 22 + /// to a single `Rc` and `Weak` pair, but notably does not require heap 23 + /// allocation. 24 + /// 25 + /// # Safety 26 + /// 27 + /// This struct *must* not be leaked to the stack using `mem::forget` or any 28 + /// other mechanism that causes the contents of `Self` to be overwritten 29 + /// without `Drop::drop()` running. 30 + /// Doing so creates unsoundness that likely will lead to dereferencing a null 31 + /// pointer. 32 + /// 33 + /// Note that it is sound to leak `Self` to the heap using methods including 34 + /// `Box::leak()` because heap allocated data will never be overwritten if it 35 + /// is never freed. 36 + pub struct AtomicValueGuard<T> { 37 + /// Mutex is unfortunately necessary because the replace operation requires 38 + /// confirming if the ptr is valid, meaning it's a two instruction process 39 + /// and can't be done with atomic compare-and-swap instructions 40 + mutex: Mutex<RawValueGuard<T>>, 41 + _marker: PhantomPinned, 42 + } 43 + 44 + impl<T> AtomicValueGuard<T> { 45 + /// Creates a new `ValueGuard` containing `data`. 46 + #[inline] 47 + pub fn new(data: T) -> Self { 48 + Self { 49 + mutex: Mutex::new(RawValueGuard { 50 + data: Cell::new(data), 51 + ref_guard: Cell::new(None), 52 + }), 53 + _marker: PhantomPinned, 54 + } 55 + } 56 + 57 + /// Sets the internal value stored by `Self`. 58 + #[inline] 59 + pub fn set(&self, value: T) { 60 + critical_section::with(|cs| self.mutex.borrow(cs).data.set(value)); 61 + } 62 + 63 + #[inline] 64 + fn invalidate_ref_guard(&self) { 65 + critical_section::with(|cs| self.mutex.borrow(cs).ref_guard.set(None)); 66 + } 67 + 68 + #[inline] 69 + fn replace_ref_guard(&self, ref_guard: Option<NonNull<AtomicRefGuard<T>>>) { 70 + critical_section::with(|cs| { 71 + if let Some(guard) = 72 + self.mutex.borrow(cs).ref_guard.replace(ref_guard) 73 + { 74 + unsafe { (*guard.as_ptr()).invalidate_value_guard() } 75 + } 76 + }); 77 + } 78 + } 79 + 80 + impl<T: Copy> AtomicValueGuard<T> { 81 + /// Gets a copy of the value stored inside this `ValueGuard`. 82 + #[inline] 83 + pub fn get(&self) -> T { 84 + critical_section::with(|cs| self.mutex.borrow(cs).data.get()) 85 + } 86 + } 87 + 88 + impl<T> Drop for AtomicValueGuard<T> { 89 + #[inline] 90 + fn drop(&mut self) { 91 + self.replace_ref_guard(None); 92 + } 93 + } 94 + 95 + /// Weak guard for acquiring read only access to a `ValueGuard`'s value. 96 + /// 97 + /// Provides [`WeakGuard::register()`](Self::register) to register a `ValueGuard` 98 + /// to `Self` and vice versa. 99 + /// 100 + /// # Safety 101 + /// 102 + /// This struct *must* not be leaked to the stack using `mem::forget` or any 103 + /// other mechanism that causes the contents of `Self` to be overwritten 104 + /// without `Drop::drop()` running. 105 + /// Doing so creates unsoundness that likely will lead to dereferencing a null 106 + /// pointer. 107 + /// 108 + /// Note that it is sound to leak `Self` to the heap using methods including 109 + /// `Box::leak()` because heap allocated data will never be overwritten if it 110 + /// is never freed. 111 + pub struct AtomicRefGuard<T> { 112 + value_guard: Cell<Option<NonNull<AtomicValueGuard<T>>>>, 113 + _marker: PhantomPinned, 114 + } 115 + 116 + impl<T> AtomicRefGuard<T> { 117 + /// Creates a new `RefGuard` with no reference to a `ValueGuard`. 118 + #[inline] 119 + pub fn new() -> Self { 120 + Self { 121 + value_guard: Cell::new(None), 122 + _marker: PhantomPinned, 123 + } 124 + } 125 + 126 + #[inline] 127 + fn invalidate_value_guard(&self) { 128 + self.value_guard.set(None); 129 + } 130 + 131 + #[inline] 132 + fn replace_value_guard( 133 + &self, 134 + value_guard: Option<NonNull<AtomicValueGuard<T>>>, 135 + ) { 136 + if let Some(guard) = self.value_guard.replace(value_guard) { 137 + unsafe { (*guard.as_ptr()).invalidate_ref_guard() } 138 + } 139 + } 140 + 141 + /// Binds a pinned `value_guard` to `self`. 142 + /// 143 + /// This means they will reference each other, and will invalidate their 144 + /// references to each other when dropped. 145 + /// 146 + /// This method also invalidates the existing references held by the 147 + /// now-replaced referencees of `self` and `value_guard` to avoid 148 + /// dangling pointers. 149 + #[inline] 150 + pub fn register<'a>( 151 + self: Pin<&'a AtomicRefGuard<T>>, 152 + value_guard: Pin<&'a AtomicValueGuard<T>>, 153 + ) { 154 + value_guard.replace_ref_guard(Some(self.get_ref().into())); 155 + self.replace_value_guard(Some(value_guard.get_ref().into())); 156 + } 157 + } 158 + 159 + impl<T: Copy> AtomicRefGuard<T> { 160 + /// Gets a copy of the value stored inside the `ValueGuard` this `RefGuard` 161 + /// references. 162 + #[inline] 163 + pub fn get(&self) -> Option<T> { 164 + self.value_guard 165 + .get() 166 + .map(|guard| unsafe { (*guard.as_ptr()).get() }) 167 + } 168 + } 169 + 170 + impl<T> Drop for AtomicRefGuard<T> { 171 + #[inline] 172 + fn drop(&mut self) { 173 + self.replace_value_guard(None); 174 + } 175 + } 176 + 177 + impl<T> Default for AtomicRefGuard<T> { 178 + #[inline] 179 + fn default() -> Self { 180 + Self::new() 181 + } 182 + } 183 + 184 + #[cfg(test)] 185 + mod test { 186 + use core::{mem, pin}; 187 + 188 + extern crate alloc; 189 + 190 + use super::*; 191 + 192 + #[test] 193 + fn basic() { 194 + let weak = pin::pin!(AtomicRefGuard::new()); 195 + { 196 + let strong = pin::pin!(AtomicValueGuard::new(2)); 197 + weak.as_ref().register(strong.as_ref()); 198 + 199 + assert_eq!(strong.get(), 2); 200 + assert_eq!(weak.get(), Some(2)); 201 + 202 + strong.as_ref().set(3); 203 + assert_eq!(strong.get(), 3); 204 + assert_eq!(weak.get(), Some(3)); 205 + } 206 + 207 + assert_eq!(weak.get(), None); 208 + } 209 + 210 + #[test] 211 + fn multiple_registrations() { 212 + let weak1 = pin::pin!(AtomicRefGuard::new()); 213 + let weak2 = pin::pin!(AtomicRefGuard::new()); 214 + { 215 + let strong = pin::pin!(AtomicValueGuard::new(2)); 216 + weak1.as_ref().register(strong.as_ref()); 217 + 218 + assert_eq!(strong.get(), 2); 219 + assert_eq!(weak1.get(), Some(2)); 220 + 221 + strong.as_ref().set(3); 222 + assert_eq!(strong.get(), 3); 223 + assert_eq!(weak1.get(), Some(3)); 224 + 225 + // register next ptr, should invalidate previous weak ref (weak1) 226 + weak2.as_ref().register(strong.as_ref()); 227 + assert_eq!(weak1.get(), None); 228 + assert_eq!(weak1.value_guard.get(), None); 229 + 230 + assert_eq!(strong.get(), 3); 231 + assert_eq!(weak2.get(), Some(3)); 232 + 233 + strong.as_ref().set(4); 234 + assert_eq!(strong.get(), 4); 235 + assert_eq!(weak2.get(), Some(4)); 236 + } 237 + 238 + assert_eq!(weak1.get(), None); 239 + assert_eq!(weak2.get(), None); 240 + } 241 + 242 + #[test] 243 + #[cfg_attr(miri, ignore)] 244 + fn safe_leak() { 245 + let strong = alloc::boxed::Box::pin(AtomicValueGuard::new(10)); 246 + let weak = pin::pin!(AtomicRefGuard::new()); 247 + weak.as_ref().register(strong.as_ref()); 248 + 249 + // strong is now a ValueGuard on the heap that will never be freed 250 + // this is sound because it will never be overwritten 251 + mem::forget(strong); 252 + 253 + assert_eq!(weak.get(), Some(10)); 254 + } 255 + }
+237
lifetime-guard/src/guard.rs
··· 1 + use core::{cell::Cell, marker::PhantomPinned, pin::Pin, ptr::NonNull}; 2 + 3 + /// Strong guard for granting read access to a single interior mutable value to 4 + /// [`RefGuard`](RefGuard). 5 + /// 6 + /// A `ValueGuard`:`RefGuard` relationship is exclusive, and behaves similarly 7 + /// to a single `Rc` and `Weak` pair, but notably does not require heap 8 + /// allocation. 9 + /// 10 + /// # Safety 11 + /// 12 + /// This struct *must* not be leaked to the stack using `mem::forget` or any 13 + /// other mechanism that causes the contents of `Self` to be overwritten 14 + /// without `Drop::drop()` running. 15 + /// Doing so creates unsoundness that likely will lead to dereferencing a null 16 + /// pointer. 17 + /// 18 + /// Note that it is sound to leak `Self` to the heap using methods including 19 + /// `Box::leak()` because heap allocated data will never be overwritten if it 20 + /// is never freed. 21 + pub struct ValueGuard<T> { 22 + /// Contains the value being immutably accessed by `RefGuard` and 23 + /// mutably accessed by `Self` 24 + /// 25 + /// This needs to be a cell so that the original immutable alias 26 + /// to `Self` (given to `RefGuard`) can continue to be referenced after 27 + /// invalidated by the creation of a mutable alias for `Self::set`. 28 + data: Cell<T>, 29 + /// A pointer to a `RefGuard` with read access to `data` to invalidate that 30 + /// `RefGuard` when `Self` is dropped. 31 + ref_guard: Cell<Option<NonNull<RefGuard<T>>>>, 32 + _marker: PhantomPinned, 33 + } 34 + 35 + impl<T> ValueGuard<T> { 36 + /// Creates a new `ValueGuard` containing `data`. 37 + #[inline] 38 + pub fn new(data: T) -> Self { 39 + Self { 40 + data: Cell::new(data), 41 + ref_guard: Cell::new(None), 42 + _marker: PhantomPinned, 43 + } 44 + } 45 + 46 + /// Sets the internal value stored by `Self`. 47 + #[inline] 48 + pub fn set(&self, value: T) { 49 + self.data.set(value); 50 + } 51 + 52 + #[inline] 53 + fn invalidate_ref_guard(&self) { 54 + self.ref_guard.set(None); 55 + } 56 + 57 + #[inline] 58 + fn replace_ref_guard(&self, ref_guard: Option<NonNull<RefGuard<T>>>) { 59 + if let Some(guard) = self.ref_guard.replace(ref_guard) { 60 + unsafe { (*guard.as_ptr()).invalidate_value_guard() }; 61 + } 62 + } 63 + } 64 + 65 + impl<T: Copy> ValueGuard<T> { 66 + /// Gets a copy of the value stored inside this `ValueGuard`. 67 + #[inline] 68 + pub fn get(&self) -> T { 69 + self.data.get() 70 + } 71 + } 72 + 73 + impl<T> Drop for ValueGuard<T> { 74 + #[inline] 75 + fn drop(&mut self) { 76 + self.replace_ref_guard(None); 77 + } 78 + } 79 + 80 + /// Weak guard for acquiring read only access to a `ValueGuard`'s value. 81 + /// 82 + /// Provides [`WeakGuard::register()`](Self::register) to register a `ValueGuard` 83 + /// to `Self` and vice versa. 84 + /// 85 + /// # Safety 86 + /// 87 + /// This struct *must* not be leaked to the stack using `mem::forget` or any 88 + /// other mechanism that causes the contents of `Self` to be overwritten 89 + /// without `Drop::drop()` running. 90 + /// Doing so creates unsoundness that likely will lead to dereferencing a null 91 + /// pointer. 92 + /// 93 + /// Note that it is sound to leak `Self` to the heap using methods including 94 + /// `Box::leak()` because heap allocated data will never be overwritten if it 95 + /// is never freed. 96 + pub struct RefGuard<T> { 97 + value_guard: Cell<Option<NonNull<ValueGuard<T>>>>, 98 + _marker: PhantomPinned, 99 + } 100 + 101 + impl<T> RefGuard<T> { 102 + /// Creates a new `RefGuard` with no reference to a `ValueGuard`. 103 + #[inline] 104 + pub fn new() -> Self { 105 + Self { 106 + value_guard: Cell::new(None), 107 + _marker: PhantomPinned, 108 + } 109 + } 110 + 111 + #[inline] 112 + fn invalidate_value_guard(&self) { 113 + self.value_guard.set(None); 114 + } 115 + 116 + #[inline] 117 + fn replace_value_guard(&self, value_guard: Option<NonNull<ValueGuard<T>>>) { 118 + if let Some(guard) = self.value_guard.replace(value_guard) { 119 + unsafe { (*guard.as_ptr()).invalidate_ref_guard() } 120 + } 121 + } 122 + 123 + /// Binds a pinned `value_guard` to `self`. 124 + /// 125 + /// This means they will reference each other, and will invalidate their 126 + /// references to each other when dropped. 127 + /// 128 + /// This method also invalidates the existing references held by the 129 + /// now-replaced referencees of `self` and `value_guard` to avoid 130 + /// dangling pointers. 131 + #[inline] 132 + pub fn register<'a>( 133 + self: Pin<&'a RefGuard<T>>, 134 + value_guard: Pin<&'a ValueGuard<T>>, 135 + ) { 136 + value_guard.replace_ref_guard(Some(self.get_ref().into())); 137 + self.replace_value_guard(Some(value_guard.get_ref().into())); 138 + } 139 + } 140 + 141 + impl<T: Copy> RefGuard<T> { 142 + /// Gets a copy of the value stored inside the `ValueGuard` this `RefGuard` 143 + /// references. 144 + #[inline] 145 + pub fn get(&self) -> Option<T> { 146 + self.value_guard 147 + .get() 148 + .map(|guard| unsafe { (*guard.as_ptr()).get() }) 149 + } 150 + } 151 + 152 + impl<T> Drop for RefGuard<T> { 153 + #[inline] 154 + fn drop(&mut self) { 155 + self.replace_value_guard(None); 156 + } 157 + } 158 + 159 + impl<T: Copy> Default for RefGuard<T> { 160 + #[inline] 161 + fn default() -> Self { 162 + Self::new() 163 + } 164 + } 165 + 166 + #[cfg(test)] 167 + mod test { 168 + use core::{mem, pin}; 169 + 170 + extern crate alloc; 171 + 172 + use super::*; 173 + 174 + #[test] 175 + fn basic() { 176 + let weak = pin::pin!(RefGuard::new()); 177 + { 178 + let strong = pin::pin!(ValueGuard::new(2)); 179 + weak.as_ref().register(strong.as_ref()); 180 + 181 + assert_eq!(strong.get(), 2); 182 + assert_eq!(weak.get(), Some(2)); 183 + 184 + strong.as_ref().set(3); 185 + assert_eq!(strong.get(), 3); 186 + assert_eq!(weak.get(), Some(3)); 187 + } 188 + 189 + assert_eq!(weak.get(), None); 190 + } 191 + 192 + #[test] 193 + fn multiple_registrations() { 194 + let weak1 = pin::pin!(RefGuard::new()); 195 + let weak2 = pin::pin!(RefGuard::new()); 196 + { 197 + let strong = pin::pin!(ValueGuard::new(2)); 198 + weak1.as_ref().register(strong.as_ref()); 199 + 200 + assert_eq!(strong.get(), 2); 201 + assert_eq!(weak1.get(), Some(2)); 202 + 203 + strong.as_ref().set(3); 204 + assert_eq!(strong.get(), 3); 205 + assert_eq!(weak1.get(), Some(3)); 206 + 207 + // register next ptr, should invalidate previous weak ref (weak1) 208 + weak2.as_ref().register(strong.as_ref()); 209 + assert_eq!(weak1.get(), None); 210 + assert_eq!(weak1.value_guard.get(), None); 211 + 212 + assert_eq!(strong.get(), 3); 213 + assert_eq!(weak2.get(), Some(3)); 214 + 215 + strong.as_ref().set(4); 216 + assert_eq!(strong.get(), 4); 217 + assert_eq!(weak2.get(), Some(4)); 218 + } 219 + 220 + assert_eq!(weak1.get(), None); 221 + assert_eq!(weak2.get(), None); 222 + } 223 + 224 + #[test] 225 + #[cfg_attr(miri, ignore)] 226 + fn safe_leak() { 227 + let strong = alloc::boxed::Box::pin(ValueGuard::new(10)); 228 + let weak = pin::pin!(RefGuard::new()); 229 + weak.as_ref().register(strong.as_ref()); 230 + 231 + // strong is now a ValueGuard on the heap that will never be freed 232 + // this is sound because it will never be overwritten 233 + mem::forget(strong); 234 + 235 + assert_eq!(weak.get(), Some(10)); 236 + } 237 + }
+8
lifetime-guard/src/lib.rs
··· 1 + #![doc = include_str!("../README.md")] 2 + #![no_std] 3 + 4 + // pub mod base; 5 + pub mod guard; 6 + 7 + #[cfg(feature = "atomics")] 8 + pub mod atomic_guard;