This repository was archived by the owner on May 17, 2024. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 278
/
Copy paththread_utils.py
98 lines (75 loc) · 3.14 KB
/
thread_utils.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
import itertools
from queue import PriorityQueue
from collections import deque
from collections.abc import Iterable
from concurrent.futures import ThreadPoolExecutor
from concurrent.futures.thread import _WorkItem
from time import sleep
from typing import Any, Callable, Iterator, Optional
import attrs
class AutoPriorityQueue(PriorityQueue):
"""Overrides PriorityQueue to automatically get the priority from _WorkItem.kwargs
We also assign a unique id for each item, to avoid making comparisons on _WorkItem.
As a side effect, items with the same priority are returned FIFO.
"""
_counter = itertools.count().__next__
def put(self, item: Optional[_WorkItem], block=True, timeout=None) -> None:
priority = item.kwargs.pop("priority") if item is not None else 0
super().put((-priority, self._counter(), item), block, timeout)
def get(self, block=True, timeout=None) -> Optional[_WorkItem]:
_p, _c, work_item = super().get(block, timeout)
return work_item
class PriorityThreadPoolExecutor(ThreadPoolExecutor):
"""Overrides ThreadPoolExecutor to use AutoPriorityQueue
XXX WARNING: Might break in future versions of Python
"""
def __init__(self, *args) -> None:
super().__init__(*args)
self._work_queue = AutoPriorityQueue()
@attrs.define(frozen=False, init=False)
class ThreadedYielder(Iterable):
"""Yields results from multiple threads into a single iterator, ordered by priority.
To add a source iterator, call ``submit()`` with a function that returns an iterator.
Priority for the iterator can be provided via the keyword argument 'priority'. (higher runs first)
"""
_pool: ThreadPoolExecutor
_futures: deque
_yield: deque
_exception: Optional[None]
_pool: ThreadPoolExecutor
_futures: deque
_yield: deque = attrs.field(alias="_yield") # Python keyword!
_exception: Optional[None]
yield_list: bool
def __init__(self, max_workers: Optional[int] = None, yield_list: bool = False) -> None:
super().__init__()
self._pool = PriorityThreadPoolExecutor(max_workers)
self._futures = deque()
self._yield = deque()
self._exception = None
self.yield_list = yield_list
def _worker(self, fn, *args, **kwargs) -> None:
try:
res = fn(*args, **kwargs)
if res is not None:
if self.yield_list:
self._yield.append(res)
else:
self._yield += res
except Exception as e:
self._exception = e
def submit(self, fn: Callable, *args, priority: int = 0, **kwargs) -> None:
self._futures.append(self._pool.submit(self._worker, fn, *args, priority=priority, **kwargs))
def __iter__(self) -> Iterator[Any]:
while True:
if self._exception:
raise self._exception
while self._yield:
yield self._yield.popleft()
if not self._futures:
# No more tasks
return
if self._futures[0].done():
self._futures.popleft()
else:
sleep(0.001)