|
3 | 3 | from collections import deque
|
4 | 4 | from concurrent.futures import Future
|
5 | 5 | from queue import Queue
|
6 |
| -from typing import Deque, Iterator, Sized, Type, TypeVar |
| 6 | +from typing import Awaitable, Deque, Iterator, Sized, Type, TypeVar, cast |
7 | 7 |
|
8 | 8 | T = TypeVar("T")
|
9 | 9 |
|
@@ -75,26 +75,31 @@ class FIFOAsyncFutureResultCollection(DequeFutureResultCollection[T]):
|
75 | 75 | First In First Out
|
76 | 76 | """
|
77 | 77 |
|
| 78 | + def __init__(self, event_loop: asyncio.AbstractEventLoop) -> None: |
| 79 | + super().__init__() |
| 80 | + self.event_loop = event_loop |
| 81 | + |
78 | 82 | def __next__(self) -> T:
|
79 |
| - return asyncio.get_event_loop().run_until_complete(self._futures.popleft()) # type: ignore |
| 83 | + return self.event_loop.run_until_complete( |
| 84 | + cast(Awaitable[T], self._futures.popleft()) |
| 85 | + ) |
80 | 86 |
|
81 | 87 |
|
82 | 88 | class FDFOAsyncFutureResultCollection(CallbackFutureResultCollection[T]):
|
83 | 89 | """
|
84 | 90 | First Done First Out
|
85 | 91 | """
|
86 | 92 |
|
87 |
| - def __init__(self) -> None: |
| 93 | + def __init__(self, event_loop: asyncio.AbstractEventLoop) -> None: |
88 | 94 | super().__init__()
|
89 |
| - self._waiter: asyncio.futures.Future[T] = ( |
90 |
| - asyncio.get_event_loop().create_future() |
91 |
| - ) |
| 95 | + self.event_loop = event_loop |
| 96 | + self._waiter: asyncio.futures.Future[T] = self.event_loop.create_future() |
92 | 97 |
|
93 | 98 | def _done_callback(self, future: "Future[T]") -> None:
|
94 | 99 | self._waiter.set_result(future.result())
|
95 | 100 |
|
96 | 101 | def __next__(self) -> T:
|
97 |
| - result = asyncio.get_event_loop().run_until_complete(self._waiter) |
| 102 | + result = self.event_loop.run_until_complete(self._waiter) |
98 | 103 | self._n_futures -= 1
|
99 |
| - self._waiter = asyncio.get_event_loop().create_future() |
| 104 | + self._waiter = self.event_loop.create_future() |
100 | 105 | return result
|
0 commit comments