3
3
import queue
4
4
import time
5
5
import weakref
6
- import _xxinterpchannels as _channels
7
- import _xxinterpchannels as _queues
6
+ import _xxinterpqueues as _queues
8
7
9
8
# aliases:
10
- from _xxinterpchannels import (
11
- ChannelError as QueueError ,
12
- ChannelNotFoundError as QueueNotFoundError ,
9
+ from _xxinterpqueues import (
10
+ QueueError , QueueNotFoundError ,
13
11
)
14
12
15
13
__all__ = [
19
17
]
20
18
21
19
20
+ class QueueEmpty (_queues .QueueEmpty , queue .Empty ):
21
+ """Raised from get_nowait() when the queue is empty.
22
+
23
+ It is also raised from get() if it times out.
24
+ """
25
+
26
+
27
+ class QueueFull (_queues .QueueFull , queue .Full ):
28
+ """Raised from put_nowait() when the queue is full.
29
+
30
+ It is also raised from put() if it times out.
31
+ """
32
+
33
+
22
34
def create (maxsize = 0 ):
23
35
"""Return a new cross-interpreter queue.
24
36
25
37
The queue may be used to pass data safely between interpreters.
26
38
"""
27
- # XXX honor maxsize
28
- qid = _queues .create ()
29
- return Queue ._with_maxsize (qid , maxsize )
39
+ qid = _queues .create (maxsize )
40
+ return Queue (qid )
30
41
31
42
32
43
def list_all ():
@@ -35,53 +46,37 @@ def list_all():
35
46
for qid in _queues .list_all ()]
36
47
37
48
38
- class QueueEmpty (queue .Empty ):
39
- """Raised from get_nowait() when the queue is empty.
40
-
41
- It is also raised from get() if it times out.
42
- """
43
-
44
-
45
- class QueueFull (queue .Full ):
46
- """Raised from put_nowait() when the queue is full.
47
-
48
- It is also raised from put() if it times out.
49
- """
50
-
51
49
52
50
_known_queues = weakref .WeakValueDictionary ()
53
51
54
52
class Queue :
55
53
"""A cross-interpreter queue."""
56
54
57
- @classmethod
58
- def _with_maxsize (cls , id , maxsize ):
59
- if not isinstance (maxsize , int ):
60
- raise TypeError (f'maxsize must be an int, got { maxsize !r} ' )
61
- elif maxsize < 0 :
62
- maxsize = 0
63
- else :
64
- maxsize = int (maxsize )
65
- self = cls (id )
66
- self ._maxsize = maxsize
67
- return self
68
-
69
55
def __new__ (cls , id , / ):
70
56
# There is only one instance for any given ID.
71
57
if isinstance (id , int ):
72
- id = _channels . _channel_id (id , force = False )
73
- elif not isinstance ( id , _channels . ChannelID ) :
58
+ id = int (id )
59
+ else :
74
60
raise TypeError (f'id must be an int, got { id !r} ' )
75
- key = int (id )
76
61
try :
77
- self = _known_queues [key ]
62
+ self = _known_queues [id ]
78
63
except KeyError :
79
64
self = super ().__new__ (cls )
80
65
self ._id = id
81
- self . _maxsize = 0
82
- _known_queues [ key ] = self
66
+ _known_queues [ id ] = self
67
+ _queues . bind ( id )
83
68
return self
84
69
70
+ def __del__ (self ):
71
+ try :
72
+ _queues .release (self ._id )
73
+ except QueueNotFoundError :
74
+ pass
75
+ try :
76
+ del _known_queues [self ._id ]
77
+ except KeyError :
78
+ pass
79
+
85
80
def __repr__ (self ):
86
81
return f'{ type (self ).__name__ } ({ self .id } )'
87
82
@@ -90,39 +85,58 @@ def __hash__(self):
90
85
91
86
@property
92
87
def id (self ):
93
- return int ( self ._id )
88
+ return self ._id
94
89
95
90
@property
96
91
def maxsize (self ):
97
- return self . _maxsize
98
-
99
- @ property
100
- def _info (self ):
101
- return _channels . get_info ( self ._id )
92
+ try :
93
+ return self . _maxsize
94
+ except AttributeError :
95
+ self . _maxsize = _queues . get_maxsize (self . _id )
96
+ return self ._maxsize
102
97
103
98
def empty (self ):
104
- return self ._info . count == 0
99
+ return self .qsize () == 0
105
100
106
101
def full (self ):
107
- if self ._maxsize <= 0 :
108
- return False
109
- return self ._info .count >= self ._maxsize
102
+ return _queues .is_full (self ._id )
110
103
111
104
def qsize (self ):
112
- return self ._info . count
105
+ return _queues . get_count ( self ._id )
113
106
114
- def put (self , obj , timeout = None ):
115
- # XXX block if full
116
- _channels .send (self ._id , obj , blocking = False )
107
+ def put (self , obj , timeout = None , * ,
108
+ _delay = 10 / 1000 , # 10 milliseconds
109
+ ):
110
+ """Add the object to the queue.
111
+
112
+ This blocks while the queue is full.
113
+ """
114
+ if timeout is not None :
115
+ timeout = int (timeout )
116
+ if timeout < 0 :
117
+ raise ValueError (f'timeout value must be non-negative' )
118
+ end = time .time () + timeout
119
+ while True :
120
+ try :
121
+ _queues .put (self ._id , obj )
122
+ except _queues .QueueFull as exc :
123
+ if timeout is not None and time .time () >= end :
124
+ exc .__class__ = QueueFull
125
+ raise # re-raise
126
+ time .sleep (_delay )
127
+ else :
128
+ break
117
129
118
130
def put_nowait (self , obj ):
119
- # XXX raise QueueFull if full
120
- return _channels .send (self ._id , obj , blocking = False )
131
+ try :
132
+ return _queues .put (self ._id , obj )
133
+ except _queues .QueueFull as exc :
134
+ exc .__class__ = QueueFull
135
+ raise # re-raise
121
136
122
137
def get (self , timeout = None , * ,
123
- _sentinel = object (),
124
- _delay = 10 / 1000 , # 10 milliseconds
125
- ):
138
+ _delay = 10 / 1000 , # 10 milliseconds
139
+ ):
126
140
"""Return the next object from the queue.
127
141
128
142
This blocks while the queue is empty.
@@ -132,25 +146,27 @@ def get(self, timeout=None, *,
132
146
if timeout < 0 :
133
147
raise ValueError (f'timeout value must be non-negative' )
134
148
end = time .time () + timeout
135
- obj = _channels .recv (self ._id , _sentinel )
136
- while obj is _sentinel :
137
- time .sleep (_delay )
138
- if timeout is not None and time .time () >= end :
139
- raise QueueEmpty
140
- obj = _channels .recv (self ._id , _sentinel )
149
+ while True :
150
+ try :
151
+ return _queues .get (self ._id )
152
+ except _queues .QueueEmpty as exc :
153
+ if timeout is not None and time .time () >= end :
154
+ exc .__class__ = QueueEmpty
155
+ raise # re-raise
156
+ time .sleep (_delay )
141
157
return obj
142
158
143
- def get_nowait (self , * , _sentinel = object () ):
159
+ def get_nowait (self ):
144
160
"""Return the next object from the channel.
145
161
146
162
If the queue is empty then raise QueueEmpty. Otherwise this
147
163
is the same as get().
148
164
"""
149
- obj = _channels .recv (self ._id , _sentinel )
150
- if obj is _sentinel :
151
- raise QueueEmpty
152
- return obj
165
+ try :
166
+ return _queues .get (self ._id )
167
+ except _queues .QueueEmpty as exc :
168
+ exc .__class__ = QueueEmpty
169
+ raise # re-raise
153
170
154
171
155
- # XXX add this:
156
- #_channels._register_queue_type(Queue)
172
+ _queues ._register_queue_type (Queue )
0 commit comments