this repo has no description
at trunk 979 lines 34 kB view raw
1#!/usr/bin/env python3 2# Copyright (c) Facebook, Inc. and its affiliates. (http://www.facebook.com) 3# WARNING: This is a temporary copy of code from the cpython library to 4# facilitate bringup. Please file a task for anything you change! 5# flake8: noqa 6# fmt: off 7"""Support for tasks, coroutines and the scheduler.""" 8 9__all__ = ( 10 'Task', 'create_task', 11 'FIRST_COMPLETED', 'FIRST_EXCEPTION', 'ALL_COMPLETED', 12 'wait', 'wait_for', 'as_completed', 'sleep', 13 'gather', 'shield', 'ensure_future', 'run_coroutine_threadsafe', 14 'current_task', 'all_tasks', 15 '_register_task', '_unregister_task', '_enter_task', '_leave_task', 16) 17 18import concurrent.futures 19import contextvars 20import functools 21import inspect 22import itertools 23import types 24import warnings 25import weakref 26 27from . import base_tasks 28from . import coroutines 29from . import events 30from . import exceptions 31from . import futures 32from .coroutines import _is_coroutine 33 34# Helper to generate new task names 35# This uses itertools.count() instead of a "+= 1" operation because the latter 36# is not thread safe. See bpo-11866 for a longer explanation. 37_task_name_counter = itertools.count(1).__next__ 38 39 40def current_task(loop=None): 41 """Return a currently executed task.""" 42 if loop is None: 43 loop = events.get_running_loop() 44 return _current_tasks.get(loop) 45 46 47def all_tasks(loop=None): 48 """Return a set of all tasks for the loop.""" 49 if loop is None: 50 loop = events.get_running_loop() 51 # Looping over a WeakSet (_all_tasks) isn't safe as it can be updated from another 52 # thread while we do so. Therefore we cast it to list prior to filtering. The list 53 # cast itself requires iteration, so we repeat it several times ignoring 54 # RuntimeErrors (which are not very likely to occur). See issues 34970 and 36607 for 55 # details. 56 i = 0 57 while True: 58 try: 59 tasks = list(_all_tasks) 60 except RuntimeError: 61 i += 1 62 if i >= 1000: 63 raise 64 else: 65 break 66 return {t for t in tasks 67 if futures._get_loop(t) is loop and not t.done()} 68 69 70def _all_tasks_compat(loop=None): 71 # Different from "all_task()" by returning *all* Tasks, including 72 # the completed ones. Used to implement deprecated "Tasks.all_task()" 73 # method. 74 if loop is None: 75 loop = events.get_event_loop() 76 # Looping over a WeakSet (_all_tasks) isn't safe as it can be updated from another 77 # thread while we do so. Therefore we cast it to list prior to filtering. The list 78 # cast itself requires iteration, so we repeat it several times ignoring 79 # RuntimeErrors (which are not very likely to occur). See issues 34970 and 36607 for 80 # details. 81 i = 0 82 while True: 83 try: 84 tasks = list(_all_tasks) 85 except RuntimeError: 86 i += 1 87 if i >= 1000: 88 raise 89 else: 90 break 91 return {t for t in tasks if futures._get_loop(t) is loop} 92 93 94def _set_task_name(task, name): 95 if name is not None: 96 try: 97 set_name = task.set_name 98 except AttributeError: 99 pass 100 else: 101 set_name(name) 102 103 104class Task(futures._PyFuture): # Inherit Python Task implementation 105 # from a Python Future implementation. 106 107 """A coroutine wrapped in a Future.""" 108 109 # An important invariant maintained while a Task not done: 110 # 111 # - Either _fut_waiter is None, and _step() is scheduled; 112 # - or _fut_waiter is some Future, and _step() is *not* scheduled. 113 # 114 # The only transition from the latter to the former is through 115 # _wakeup(). When _fut_waiter is not None, one of its callbacks 116 # must be _wakeup(). 117 118 # If False, don't log a message if the task is destroyed whereas its 119 # status is still pending 120 _log_destroy_pending = True 121 122 @classmethod 123 def current_task(cls, loop=None): 124 """Return the currently running task in an event loop or None. 125 126 By default the current task for the current event loop is returned. 127 128 None is returned when called not in the context of a Task. 129 """ 130 warnings.warn("Task.current_task() is deprecated since Python 3.7, " 131 "use asyncio.current_task() instead", 132 DeprecationWarning, 133 stacklevel=2) 134 if loop is None: 135 loop = events.get_event_loop() 136 return current_task(loop) 137 138 @classmethod 139 def all_tasks(cls, loop=None): 140 """Return a set of all tasks for an event loop. 141 142 By default all tasks for the current event loop are returned. 143 """ 144 warnings.warn("Task.all_tasks() is deprecated since Python 3.7, " 145 "use asyncio.all_tasks() instead", 146 DeprecationWarning, 147 stacklevel=2) 148 return _all_tasks_compat(loop) 149 150 def __init__(self, coro, *, loop=None, name=None): 151 super().__init__(loop=loop) 152 if self._source_traceback: 153 del self._source_traceback[-1] 154 if not coroutines.iscoroutine(coro): 155 # raise after Future.__init__(), attrs are required for __del__ 156 # prevent logging for pending task in __del__ 157 self._log_destroy_pending = False 158 raise TypeError(f"a coroutine was expected, got {coro!r}") 159 160 if name is None: 161 self._name = f'Task-{_task_name_counter()}' 162 else: 163 self._name = str(name) 164 165 self._must_cancel = False 166 self._fut_waiter = None 167 self._coro = coro 168 self._context = contextvars.copy_context() 169 170 self._loop.call_soon(self._step, context=self._context) 171 _register_task(self) 172 173 def __del__(self): 174 if self._state == futures._PENDING and self._log_destroy_pending: 175 context = { 176 'task': self, 177 'message': 'Task was destroyed but it is pending!', 178 } 179 if self._source_traceback: 180 context['source_traceback'] = self._source_traceback 181 self._loop.call_exception_handler(context) 182 super().__del__() 183 184 def _repr_info(self): 185 return base_tasks._task_repr_info(self) 186 187 def get_coro(self): 188 return self._coro 189 190 def get_name(self): 191 return self._name 192 193 def set_name(self, value): 194 self._name = str(value) 195 196 def set_result(self, result): 197 raise RuntimeError('Task does not support set_result operation') 198 199 def set_exception(self, exception): 200 raise RuntimeError('Task does not support set_exception operation') 201 202 def get_stack(self, *, limit=None): 203 """Return the list of stack frames for this task's coroutine. 204 205 If the coroutine is not done, this returns the stack where it is 206 suspended. If the coroutine has completed successfully or was 207 cancelled, this returns an empty list. If the coroutine was 208 terminated by an exception, this returns the list of traceback 209 frames. 210 211 The frames are always ordered from oldest to newest. 212 213 The optional limit gives the maximum number of frames to 214 return; by default all available frames are returned. Its 215 meaning differs depending on whether a stack or a traceback is 216 returned: the newest frames of a stack are returned, but the 217 oldest frames of a traceback are returned. (This matches the 218 behavior of the traceback module.) 219 220 For reasons beyond our control, only one stack frame is 221 returned for a suspended coroutine. 222 """ 223 return base_tasks._task_get_stack(self, limit) 224 225 def print_stack(self, *, limit=None, file=None): 226 """Print the stack or traceback for this task's coroutine. 227 228 This produces output similar to that of the traceback module, 229 for the frames retrieved by get_stack(). The limit argument 230 is passed to get_stack(). The file argument is an I/O stream 231 to which the output is written; by default output is written 232 to sys.stderr. 233 """ 234 return base_tasks._task_print_stack(self, limit, file) 235 236 def cancel(self): 237 """Request that this task cancel itself. 238 239 This arranges for a CancelledError to be thrown into the 240 wrapped coroutine on the next cycle through the event loop. 241 The coroutine then has a chance to clean up or even deny 242 the request using try/except/finally. 243 244 Unlike Future.cancel, this does not guarantee that the 245 task will be cancelled: the exception might be caught and 246 acted upon, delaying cancellation of the task or preventing 247 cancellation completely. The task may also return a value or 248 raise a different exception. 249 250 Immediately after this method is called, Task.cancelled() will 251 not return True (unless the task was already cancelled). A 252 task will be marked as cancelled when the wrapped coroutine 253 terminates with a CancelledError exception (even if cancel() 254 was not called). 255 """ 256 self._log_traceback = False 257 if self.done(): 258 return False 259 if self._fut_waiter is not None: 260 if self._fut_waiter.cancel(): 261 # Leave self._fut_waiter; it may be a Task that 262 # catches and ignores the cancellation so we may have 263 # to cancel it again later. 264 return True 265 # It must be the case that self._step is already scheduled. 266 self._must_cancel = True 267 return True 268 269 def _step(self, exc=None): 270 if self.done(): 271 raise exceptions.InvalidStateError( 272 f'_step(): already done: {self!r}, {exc!r}') 273 if self._must_cancel: 274 if not isinstance(exc, exceptions.CancelledError): 275 exc = exceptions.CancelledError() 276 self._must_cancel = False 277 coro = self._coro 278 self._fut_waiter = None 279 280 _enter_task(self._loop, self) 281 # Call either coro.throw(exc) or coro.send(None). 282 try: 283 if exc is None: 284 # We use the `send` method directly, because coroutines 285 # don't have `__iter__` and `__next__` methods. 286 result = coro.send(None) 287 else: 288 result = coro.throw(exc) 289 except StopIteration as exc: 290 if self._must_cancel: 291 # Task is cancelled right before coro stops. 292 self._must_cancel = False 293 super().cancel() 294 else: 295 super().set_result(exc.value) 296 except exceptions.CancelledError: 297 super().cancel() # I.e., Future.cancel(self). 298 except (KeyboardInterrupt, SystemExit) as exc: 299 super().set_exception(exc) 300 raise 301 except BaseException as exc: 302 super().set_exception(exc) 303 else: 304 blocking = getattr(result, '_asyncio_future_blocking', None) 305 if blocking is not None: 306 # Yielded Future must come from Future.__iter__(). 307 if futures._get_loop(result) is not self._loop: 308 new_exc = RuntimeError( 309 f'Task {self!r} got Future ' 310 f'{result!r} attached to a different loop') 311 self._loop.call_soon( 312 self._step, new_exc, context=self._context) 313 elif blocking: 314 if result is self: 315 new_exc = RuntimeError( 316 f'Task cannot await on itself: {self!r}') 317 self._loop.call_soon( 318 self._step, new_exc, context=self._context) 319 else: 320 result._asyncio_future_blocking = False 321 result.add_done_callback( 322 self.__wakeup, context=self._context) 323 self._fut_waiter = result 324 if self._must_cancel: 325 if self._fut_waiter.cancel(): 326 self._must_cancel = False 327 else: 328 new_exc = RuntimeError( 329 f'yield was used instead of yield from ' 330 f'in task {self!r} with {result!r}') 331 self._loop.call_soon( 332 self._step, new_exc, context=self._context) 333 334 elif result is None: 335 # Bare yield relinquishes control for one event loop iteration. 336 self._loop.call_soon(self._step, context=self._context) 337 elif inspect.isgenerator(result): 338 # Yielding a generator is just wrong. 339 new_exc = RuntimeError( 340 f'yield was used instead of yield from for ' 341 f'generator in task {self!r} with {result!r}') 342 self._loop.call_soon( 343 self._step, new_exc, context=self._context) 344 else: 345 # Yielding something else is an error. 346 new_exc = RuntimeError(f'Task got bad yield: {result!r}') 347 self._loop.call_soon( 348 self._step, new_exc, context=self._context) 349 finally: 350 _leave_task(self._loop, self) 351 self = None # Needed to break cycles when an exception occurs. 352 353 def __wakeup(self, future): 354 try: 355 future.result() 356 except BaseException as exc: 357 # This may also be a cancellation. 358 self._step(exc) 359 else: 360 # Don't pass the value of `future.result()` explicitly, 361 # as `Future.__iter__` and `Future.__await__` don't need it. 362 # If we call `_step(value, None)` instead of `_step()`, 363 # Python eval loop would use `.send(value)` method call, 364 # instead of `__next__()`, which is slower for futures 365 # that return non-generator iterators from their `__iter__`. 366 self._step() 367 self = None # Needed to break cycles when an exception occurs. 368 369 370_PyTask = Task 371 372 373try: 374 import _asyncio 375except ImportError: 376 pass 377else: 378 # _CTask is needed for tests. 379 Task = _CTask = _asyncio.Task 380 381 382def create_task(coro, *, name=None): 383 """Schedule the execution of a coroutine object in a spawn task. 384 385 Return a Task object. 386 """ 387 loop = events.get_running_loop() 388 task = loop.create_task(coro) 389 _set_task_name(task, name) 390 return task 391 392 393# wait() and as_completed() similar to those in PEP 3148. 394 395FIRST_COMPLETED = concurrent.futures.FIRST_COMPLETED 396FIRST_EXCEPTION = concurrent.futures.FIRST_EXCEPTION 397ALL_COMPLETED = concurrent.futures.ALL_COMPLETED 398 399 400async def wait(fs, *, loop=None, timeout=None, return_when=ALL_COMPLETED): 401 """Wait for the Futures and coroutines given by fs to complete. 402 403 The sequence futures must not be empty. 404 405 Coroutines will be wrapped in Tasks. 406 407 Returns two sets of Future: (done, pending). 408 409 Usage: 410 411 done, pending = await asyncio.wait(fs) 412 413 Note: This does not raise TimeoutError! Futures that aren't done 414 when the timeout occurs are returned in the second set. 415 """ 416 if futures.isfuture(fs) or coroutines.iscoroutine(fs): 417 raise TypeError(f"expect a list of futures, not {type(fs).__name__}") 418 if not fs: 419 raise ValueError('Set of coroutines/Futures is empty.') 420 if return_when not in (FIRST_COMPLETED, FIRST_EXCEPTION, ALL_COMPLETED): 421 raise ValueError(f'Invalid return_when value: {return_when}') 422 423 if loop is None: 424 loop = events.get_running_loop() 425 else: 426 warnings.warn("The loop argument is deprecated since Python 3.8, " 427 "and scheduled for removal in Python 3.10.", 428 DeprecationWarning, stacklevel=2) 429 430 fs = {ensure_future(f, loop=loop) for f in set(fs)} 431 432 return await _wait(fs, timeout, return_when, loop) 433 434 435def _release_waiter(waiter, *args): 436 if not waiter.done(): 437 waiter.set_result(None) 438 439 440async def wait_for(fut, timeout, *, loop=None): 441 """Wait for the single Future or coroutine to complete, with timeout. 442 443 Coroutine will be wrapped in Task. 444 445 Returns result of the Future or coroutine. When a timeout occurs, 446 it cancels the task and raises TimeoutError. To avoid the task 447 cancellation, wrap it in shield(). 448 449 If the wait is cancelled, the task is also cancelled. 450 451 This function is a coroutine. 452 """ 453 if loop is None: 454 loop = events.get_running_loop() 455 else: 456 warnings.warn("The loop argument is deprecated since Python 3.8, " 457 "and scheduled for removal in Python 3.10.", 458 DeprecationWarning, stacklevel=2) 459 460 if timeout is None: 461 return await fut 462 463 if timeout <= 0: 464 fut = ensure_future(fut, loop=loop) 465 466 if fut.done(): 467 return fut.result() 468 469 fut.cancel() 470 raise exceptions.TimeoutError() 471 472 waiter = loop.create_future() 473 timeout_handle = loop.call_later(timeout, _release_waiter, waiter) 474 cb = functools.partial(_release_waiter, waiter) 475 476 fut = ensure_future(fut, loop=loop) 477 fut.add_done_callback(cb) 478 479 try: 480 # wait until the future completes or the timeout 481 try: 482 await waiter 483 except exceptions.CancelledError: 484 fut.remove_done_callback(cb) 485 fut.cancel() 486 raise 487 488 if fut.done(): 489 return fut.result() 490 else: 491 fut.remove_done_callback(cb) 492 # We must ensure that the task is not running 493 # after wait_for() returns. 494 # See https://bugs.python.org/issue32751 495 await _cancel_and_wait(fut, loop=loop) 496 raise exceptions.TimeoutError() 497 finally: 498 timeout_handle.cancel() 499 500 501async def _wait(fs, timeout, return_when, loop): 502 """Internal helper for wait(). 503 504 The fs argument must be a collection of Futures. 505 """ 506 assert fs, 'Set of Futures is empty.' 507 waiter = loop.create_future() 508 timeout_handle = None 509 if timeout is not None: 510 timeout_handle = loop.call_later(timeout, _release_waiter, waiter) 511 counter = len(fs) 512 513 def _on_completion(f): 514 nonlocal counter 515 counter -= 1 516 if (counter <= 0 or 517 return_when == FIRST_COMPLETED or 518 return_when == FIRST_EXCEPTION and (not f.cancelled() and 519 f.exception() is not None)): 520 if timeout_handle is not None: 521 timeout_handle.cancel() 522 if not waiter.done(): 523 waiter.set_result(None) 524 525 for f in fs: 526 f.add_done_callback(_on_completion) 527 528 try: 529 await waiter 530 finally: 531 if timeout_handle is not None: 532 timeout_handle.cancel() 533 for f in fs: 534 f.remove_done_callback(_on_completion) 535 536 done, pending = set(), set() 537 for f in fs: 538 if f.done(): 539 done.add(f) 540 else: 541 pending.add(f) 542 return done, pending 543 544 545async def _cancel_and_wait(fut, loop): 546 """Cancel the *fut* future or task and wait until it completes.""" 547 548 waiter = loop.create_future() 549 cb = functools.partial(_release_waiter, waiter) 550 fut.add_done_callback(cb) 551 552 try: 553 fut.cancel() 554 # We cannot wait on *fut* directly to make 555 # sure _cancel_and_wait itself is reliably cancellable. 556 await waiter 557 finally: 558 fut.remove_done_callback(cb) 559 560 561# This is *not* a @coroutine! It is just an iterator (yielding Futures). 562def as_completed(fs, *, loop=None, timeout=None): 563 """Return an iterator whose values are coroutines. 564 565 When waiting for the yielded coroutines you'll get the results (or 566 exceptions!) of the original Futures (or coroutines), in the order 567 in which and as soon as they complete. 568 569 This differs from PEP 3148; the proper way to use this is: 570 571 for f in as_completed(fs): 572 result = await f # The 'await' may raise. 573 # Use result. 574 575 If a timeout is specified, the 'await' will raise 576 TimeoutError when the timeout occurs before all Futures are done. 577 578 Note: The futures 'f' are not necessarily members of fs. 579 """ 580 if futures.isfuture(fs) or coroutines.iscoroutine(fs): 581 raise TypeError(f"expect a list of futures, not {type(fs).__name__}") 582 583 from .queues import Queue # Import here to avoid circular import problem. 584 done = Queue(loop=loop) 585 586 if loop is None: 587 loop = events.get_event_loop() 588 else: 589 warnings.warn("The loop argument is deprecated since Python 3.8, " 590 "and scheduled for removal in Python 3.10.", 591 DeprecationWarning, stacklevel=2) 592 todo = {ensure_future(f, loop=loop) for f in set(fs)} 593 timeout_handle = None 594 595 def _on_timeout(): 596 for f in todo: 597 f.remove_done_callback(_on_completion) 598 done.put_nowait(None) # Queue a dummy value for _wait_for_one(). 599 todo.clear() # Can't do todo.remove(f) in the loop. 600 601 def _on_completion(f): 602 if not todo: 603 return # _on_timeout() was here first. 604 todo.remove(f) 605 done.put_nowait(f) 606 if not todo and timeout_handle is not None: 607 timeout_handle.cancel() 608 609 async def _wait_for_one(): 610 f = await done.get() 611 if f is None: 612 # Dummy value from _on_timeout(). 613 raise exceptions.TimeoutError 614 return f.result() # May raise f.exception(). 615 616 for f in todo: 617 f.add_done_callback(_on_completion) 618 if todo and timeout is not None: 619 timeout_handle = loop.call_later(timeout, _on_timeout) 620 for _ in range(len(todo)): 621 yield _wait_for_one() 622 623 624@types.coroutine 625def __sleep0(): 626 """Skip one event loop run cycle. 627 628 This is a private helper for 'asyncio.sleep()', used 629 when the 'delay' is set to 0. It uses a bare 'yield' 630 expression (which Task._step knows how to handle) 631 instead of creating a Future object. 632 """ 633 yield 634 635 636async def sleep(delay, result=None, *, loop=None): 637 """Coroutine that completes after a given time (in seconds).""" 638 if delay <= 0: 639 await __sleep0() 640 return result 641 642 if loop is None: 643 loop = events.get_running_loop() 644 else: 645 warnings.warn("The loop argument is deprecated since Python 3.8, " 646 "and scheduled for removal in Python 3.10.", 647 DeprecationWarning, stacklevel=2) 648 649 future = loop.create_future() 650 h = loop.call_later(delay, 651 futures._set_result_unless_cancelled, 652 future, result) 653 try: 654 return await future 655 finally: 656 h.cancel() 657 658 659def ensure_future(coro_or_future, *, loop=None): 660 """Wrap a coroutine or an awaitable in a future. 661 662 If the argument is a Future, it is returned directly. 663 """ 664 if coroutines.iscoroutine(coro_or_future): 665 if loop is None: 666 loop = events.get_event_loop() 667 task = loop.create_task(coro_or_future) 668 if task._source_traceback: 669 del task._source_traceback[-1] 670 return task 671 elif futures.isfuture(coro_or_future): 672 if loop is not None and loop is not futures._get_loop(coro_or_future): 673 raise ValueError('The future belongs to a different loop than ' 674 'the one specified as the loop argument') 675 return coro_or_future 676 elif inspect.isawaitable(coro_or_future): 677 return ensure_future(_wrap_awaitable(coro_or_future), loop=loop) 678 else: 679 raise TypeError('An asyncio.Future, a coroutine or an awaitable is ' 680 'required') 681 682 683@types.coroutine 684def _wrap_awaitable(awaitable): 685 """Helper for asyncio.ensure_future(). 686 687 Wraps awaitable (an object with __await__) into a coroutine 688 that will later be wrapped in a Task by ensure_future(). 689 """ 690 return (yield from awaitable.__await__()) 691 692_wrap_awaitable._is_coroutine = _is_coroutine 693 694 695class _GatheringFuture(futures.Future): 696 """Helper for gather(). 697 698 This overrides cancel() to cancel all the children and act more 699 like Task.cancel(), which doesn't immediately mark itself as 700 cancelled. 701 """ 702 703 def __init__(self, children, *, loop=None): 704 super().__init__(loop=loop) 705 self._children = children 706 self._cancel_requested = False 707 708 def cancel(self): 709 if self.done(): 710 return False 711 ret = False 712 for child in self._children: 713 if child.cancel(): 714 ret = True 715 if ret: 716 # If any child tasks were actually cancelled, we should 717 # propagate the cancellation request regardless of 718 # *return_exceptions* argument. See issue 32684. 719 self._cancel_requested = True 720 return ret 721 722 723def gather(*coros_or_futures, loop=None, return_exceptions=False): 724 """Return a future aggregating results from the given coroutines/futures. 725 726 Coroutines will be wrapped in a future and scheduled in the event 727 loop. They will not necessarily be scheduled in the same order as 728 passed in. 729 730 All futures must share the same event loop. If all the tasks are 731 done successfully, the returned future's result is the list of 732 results (in the order of the original sequence, not necessarily 733 the order of results arrival). If *return_exceptions* is True, 734 exceptions in the tasks are treated the same as successful 735 results, and gathered in the result list; otherwise, the first 736 raised exception will be immediately propagated to the returned 737 future. 738 739 Cancellation: if the outer Future is cancelled, all children (that 740 have not completed yet) are also cancelled. If any child is 741 cancelled, this is treated as if it raised CancelledError -- 742 the outer Future is *not* cancelled in this case. (This is to 743 prevent the cancellation of one child to cause other children to 744 be cancelled.) 745 746 If *return_exceptions* is False, cancelling gather() after it 747 has been marked done won't cancel any submitted awaitables. 748 For instance, gather can be marked done after propagating an 749 exception to the caller, therefore, calling ``gather.cancel()`` 750 after catching an exception (raised by one of the awaitables) from 751 gather won't cancel any other awaitables. 752 """ 753 if not coros_or_futures: 754 if loop is None: 755 loop = events.get_event_loop() 756 else: 757 warnings.warn("The loop argument is deprecated since Python 3.8, " 758 "and scheduled for removal in Python 3.10.", 759 DeprecationWarning, stacklevel=2) 760 outer = loop.create_future() 761 outer.set_result([]) 762 return outer 763 764 def _done_callback(fut): 765 nonlocal nfinished 766 nfinished += 1 767 768 if outer.done(): 769 if not fut.cancelled(): 770 # Mark exception retrieved. 771 fut.exception() 772 return 773 774 if not return_exceptions: 775 if fut.cancelled(): 776 # Check if 'fut' is cancelled first, as 777 # 'fut.exception()' will *raise* a CancelledError 778 # instead of returning it. 779 exc = exceptions.CancelledError() 780 outer.set_exception(exc) 781 return 782 else: 783 exc = fut.exception() 784 if exc is not None: 785 outer.set_exception(exc) 786 return 787 788 if nfinished == nfuts: 789 # All futures are done; create a list of results 790 # and set it to the 'outer' future. 791 results = [] 792 793 for fut in children: 794 if fut.cancelled(): 795 # Check if 'fut' is cancelled first, as 796 # 'fut.exception()' will *raise* a CancelledError 797 # instead of returning it. 798 res = exceptions.CancelledError() 799 else: 800 res = fut.exception() 801 if res is None: 802 res = fut.result() 803 results.append(res) 804 805 if outer._cancel_requested: 806 # If gather is being cancelled we must propagate the 807 # cancellation regardless of *return_exceptions* argument. 808 # See issue 32684. 809 outer.set_exception(exceptions.CancelledError()) 810 else: 811 outer.set_result(results) 812 813 arg_to_fut = {} 814 children = [] 815 nfuts = 0 816 nfinished = 0 817 for arg in coros_or_futures: 818 if arg not in arg_to_fut: 819 fut = ensure_future(arg, loop=loop) 820 if loop is None: 821 loop = futures._get_loop(fut) 822 if fut is not arg: 823 # 'arg' was not a Future, therefore, 'fut' is a new 824 # Future created specifically for 'arg'. Since the caller 825 # can't control it, disable the "destroy pending task" 826 # warning. 827 fut._log_destroy_pending = False 828 829 nfuts += 1 830 arg_to_fut[arg] = fut 831 fut.add_done_callback(_done_callback) 832 833 else: 834 # There's a duplicate Future object in coros_or_futures. 835 fut = arg_to_fut[arg] 836 837 children.append(fut) 838 839 outer = _GatheringFuture(children, loop=loop) 840 return outer 841 842 843def shield(arg, *, loop=None): 844 """Wait for a future, shielding it from cancellation. 845 846 The statement 847 848 res = await shield(something()) 849 850 is exactly equivalent to the statement 851 852 res = await something() 853 854 *except* that if the coroutine containing it is cancelled, the 855 task running in something() is not cancelled. From the POV of 856 something(), the cancellation did not happen. But its caller is 857 still cancelled, so the yield-from expression still raises 858 CancelledError. Note: If something() is cancelled by other means 859 this will still cancel shield(). 860 861 If you want to completely ignore cancellation (not recommended) 862 you can combine shield() with a try/except clause, as follows: 863 864 try: 865 res = await shield(something()) 866 except CancelledError: 867 res = None 868 """ 869 if loop is not None: 870 warnings.warn("The loop argument is deprecated since Python 3.8, " 871 "and scheduled for removal in Python 3.10.", 872 DeprecationWarning, stacklevel=2) 873 inner = ensure_future(arg, loop=loop) 874 if inner.done(): 875 # Shortcut. 876 return inner 877 loop = futures._get_loop(inner) 878 outer = loop.create_future() 879 880 def _inner_done_callback(inner): 881 if outer.cancelled(): 882 if not inner.cancelled(): 883 # Mark inner's result as retrieved. 884 inner.exception() 885 return 886 887 if inner.cancelled(): 888 outer.cancel() 889 else: 890 exc = inner.exception() 891 if exc is not None: 892 outer.set_exception(exc) 893 else: 894 outer.set_result(inner.result()) 895 896 897 def _outer_done_callback(outer): 898 if not inner.done(): 899 inner.remove_done_callback(_inner_done_callback) 900 901 inner.add_done_callback(_inner_done_callback) 902 outer.add_done_callback(_outer_done_callback) 903 return outer 904 905 906def run_coroutine_threadsafe(coro, loop): 907 """Submit a coroutine object to a given event loop. 908 909 Return a concurrent.futures.Future to access the result. 910 """ 911 if not coroutines.iscoroutine(coro): 912 raise TypeError('A coroutine object is required') 913 future = concurrent.futures.Future() 914 915 def callback(): 916 try: 917 futures._chain_future(ensure_future(coro, loop=loop), future) 918 except (SystemExit, KeyboardInterrupt): 919 raise 920 except BaseException as exc: 921 if future.set_running_or_notify_cancel(): 922 future.set_exception(exc) 923 raise 924 925 loop.call_soon_threadsafe(callback) 926 return future 927 928 929# WeakSet containing all alive tasks. 930_all_tasks = weakref.WeakSet() 931 932# Dictionary containing tasks that are currently active in 933# all running event loops. {EventLoop: Task} 934_current_tasks = {} 935 936 937def _register_task(task): 938 """Register a new task in asyncio as executed by loop.""" 939 _all_tasks.add(task) 940 941 942def _enter_task(loop, task): 943 current_task = _current_tasks.get(loop) 944 if current_task is not None: 945 raise RuntimeError(f"Cannot enter into task {task!r} while another " 946 f"task {current_task!r} is being executed.") 947 _current_tasks[loop] = task 948 949 950def _leave_task(loop, task): 951 current_task = _current_tasks.get(loop) 952 if current_task is not task: 953 raise RuntimeError(f"Leaving task {task!r} does not match " 954 f"the current task {current_task!r}.") 955 del _current_tasks[loop] 956 957 958def _unregister_task(task): 959 """Unregister a task.""" 960 _all_tasks.discard(task) 961 962 963_py_register_task = _register_task 964_py_unregister_task = _unregister_task 965_py_enter_task = _enter_task 966_py_leave_task = _leave_task 967 968 969try: 970 from _asyncio import (_register_task, _unregister_task, 971 _enter_task, _leave_task, 972 _all_tasks, _current_tasks) 973except ImportError: 974 pass 975else: 976 _c_register_task = _register_task 977 _c_unregister_task = _unregister_task 978 _c_enter_task = _enter_task 979 _c_leave_task = _leave_task