@@ -27,12 +27,21 @@ def __init__(self, addr, redis_addr, delay: float = 0.0):
27
27
self .delay = delay
28
28
self .send_event = asyncio .Event ()
29
29
30
+ async def __aenter__ (self ):
31
+ await self .start ()
32
+ return self
33
+
34
+ async def __aexit__ (self , * args ):
35
+ await self .stop ()
36
+
30
37
async def start (self ):
31
38
# test that we can connect to redis
32
39
async with async_timeout (2 ):
33
40
_ , redis_writer = await asyncio .open_connection (* self .redis_addr )
34
41
redis_writer .close ()
35
- self .server = await asyncio .start_server (self .handle , * self .addr )
42
+ self .server = await asyncio .start_server (
43
+ self .handle , * self .addr , reuse_address = True
44
+ )
36
45
self .ROUTINE = asyncio .create_task (self .server .serve_forever ())
37
46
38
47
@contextlib .contextmanager
@@ -95,91 +104,89 @@ async def test_standalone(delay, redis_addr):
95
104
96
105
# create a tcp socket proxy that relays data to Redis and back,
97
106
# inserting 0.1 seconds of delay
98
- dp = DelayProxy (addr = ("127.0.0.1" , 5380 ), redis_addr = redis_addr )
99
- await dp .start ()
100
-
101
- for b in [True , False ]:
102
- # note that we connect to proxy, rather than to Redis directly
103
- async with Redis (host = "127.0.0.1" , port = 5380 , single_connection_client = b ) as r :
104
-
105
- await r .set ("foo" , "foo" )
106
- await r .set ("bar" , "bar" )
107
-
108
- async def op (r ):
109
- with dp .set_delay (delay * 2 ):
110
- return await r .get (
111
- "foo"
112
- ) # <-- this is the operation we want to cancel
113
-
114
- dp .send_event .clear ()
115
- t = asyncio .create_task (op (r ))
116
- # Wait until the task has sent, and then some, to make sure it has
117
- # settled on the read.
118
- await dp .send_event .wait ()
119
- await asyncio .sleep (0.01 ) # a little extra time for prudence
120
- t .cancel ()
121
- with pytest .raises (asyncio .CancelledError ):
122
- await t
123
-
124
- # make sure that our previous request, cancelled while waiting for
125
- # a repsponse, didn't leave the connection open andin a bad state
126
- assert await r .get ("bar" ) == b"bar"
127
- assert await r .ping ()
128
- assert await r .get ("foo" ) == b"foo"
129
-
130
- await dp .stop ()
107
+ async with DelayProxy (addr = ("127.0.0.1" , 5380 ), redis_addr = redis_addr ) as dp :
108
+
109
+ for b in [True , False ]:
110
+ # note that we connect to proxy, rather than to Redis directly
111
+ async with Redis (
112
+ host = "127.0.0.1" , port = 5380 , single_connection_client = b
113
+ ) as r :
114
+
115
+ await r .set ("foo" , "foo" )
116
+ await r .set ("bar" , "bar" )
117
+
118
+ async def op (r ):
119
+ with dp .set_delay (delay * 2 ):
120
+ return await r .get (
121
+ "foo"
122
+ ) # <-- this is the operation we want to cancel
123
+
124
+ dp .send_event .clear ()
125
+ t = asyncio .create_task (op (r ))
126
+ # Wait until the task has sent, and then some, to make sure it has
127
+ # settled on the read.
128
+ await dp .send_event .wait ()
129
+ await asyncio .sleep (0.01 ) # a little extra time for prudence
130
+ t .cancel ()
131
+ with pytest .raises (asyncio .CancelledError ):
132
+ await t
133
+
134
+ # make sure that our previous request, cancelled while waiting for
135
+ # a repsponse, didn't leave the connection open andin a bad state
136
+ assert await r .get ("bar" ) == b"bar"
137
+ assert await r .ping ()
138
+ assert await r .get ("foo" ) == b"foo"
131
139
132
140
133
141
@pytest .mark .onlynoncluster
134
142
@pytest .mark .parametrize ("delay" , argvalues = [0.05 , 0.5 , 1 , 2 ])
135
143
async def test_standalone_pipeline (delay , redis_addr ):
136
- dp = DelayProxy (addr = ("127.0.0.1" , 5380 ), redis_addr = redis_addr )
137
- await dp .start ()
138
- for b in [True , False ]:
139
- async with Redis (host = "127.0.0.1" , port = 5380 , single_connection_client = b ) as r :
140
- await r .set ("foo" , "foo" )
141
- await r .set ("bar" , "bar" )
142
-
143
- pipe = r .pipeline ()
144
-
145
- pipe2 = r .pipeline ()
146
- pipe2 .get ("bar" )
147
- pipe2 .ping ()
148
- pipe2 .get ("foo" )
149
-
150
- async def op (pipe ):
151
- with dp .set_delay (delay * 2 ):
152
- return await pipe .get (
153
- "foo"
154
- ).execute () # <-- this is the operation we want to cancel
155
-
156
- dp .send_event .clear ()
157
- t = asyncio .create_task (op (pipe ))
158
- # wait until task has settled on the read
159
- await dp .send_event .wait ()
160
- await asyncio .sleep (0.01 )
161
- t .cancel ()
162
- with pytest .raises (asyncio .CancelledError ):
163
- await t
164
-
165
- # we have now cancelled the pieline in the middle of a request, make sure
166
- # that the connection is still usable
167
- pipe .get ("bar" )
168
- pipe .ping ()
169
- pipe .get ("foo" )
170
- await pipe .reset ()
171
-
172
- # check that the pipeline is empty after reset
173
- assert await pipe .execute () == []
174
-
175
- # validating that the pipeline can be used as it could previously
176
- pipe .get ("bar" )
177
- pipe .ping ()
178
- pipe .get ("foo" )
179
- assert await pipe .execute () == [b"bar" , True , b"foo" ]
180
- assert await pipe2 .execute () == [b"bar" , True , b"foo" ]
181
-
182
- await dp .stop ()
144
+ async with DelayProxy (addr = ("127.0.0.1" , 5380 ), redis_addr = redis_addr ) as dp :
145
+ for b in [True , False ]:
146
+ async with Redis (
147
+ host = "127.0.0.1" , port = 5380 , single_connection_client = b
148
+ ) as r :
149
+ await r .set ("foo" , "foo" )
150
+ await r .set ("bar" , "bar" )
151
+
152
+ pipe = r .pipeline ()
153
+
154
+ pipe2 = r .pipeline ()
155
+ pipe2 .get ("bar" )
156
+ pipe2 .ping ()
157
+ pipe2 .get ("foo" )
158
+
159
+ async def op (pipe ):
160
+ with dp .set_delay (delay * 2 ):
161
+ return await pipe .get (
162
+ "foo"
163
+ ).execute () # <-- this is the operation we want to cancel
164
+
165
+ dp .send_event .clear ()
166
+ t = asyncio .create_task (op (pipe ))
167
+ # wait until task has settled on the read
168
+ await dp .send_event .wait ()
169
+ await asyncio .sleep (0.01 )
170
+ t .cancel ()
171
+ with pytest .raises (asyncio .CancelledError ):
172
+ await t
173
+
174
+ # we have now cancelled the pieline in the middle of a request, make sure
175
+ # that the connection is still usable
176
+ pipe .get ("bar" )
177
+ pipe .ping ()
178
+ pipe .get ("foo" )
179
+ await pipe .reset ()
180
+
181
+ # check that the pipeline is empty after reset
182
+ assert await pipe .execute () == []
183
+
184
+ # validating that the pipeline can be used as it could previously
185
+ pipe .get ("bar" )
186
+ pipe .ping ()
187
+ pipe .get ("foo" )
188
+ assert await pipe .execute () == [b"bar" , True , b"foo" ]
189
+ assert await pipe2 .execute () == [b"bar" , True , b"foo" ]
183
190
184
191
185
192
@pytest .mark .onlycluster
@@ -202,9 +209,6 @@ def remap(address):
202
209
proxy = DelayProxy (addr = ("127.0.0.1" , remapped ), redis_addr = forward_addr )
203
210
proxies .append (proxy )
204
211
205
- # start proxies
206
- await asyncio .gather (* [p .start () for p in proxies ])
207
-
208
212
def all_clear ():
209
213
for p in proxies :
210
214
p .send_event .clear ()
@@ -221,32 +225,36 @@ def set_delay(delay: float):
221
225
stack .enter_context (p .set_delay (delay ))
222
226
yield
223
227
224
- with contextlib .closing (
225
- RedisCluster .from_url (f"redis://127.0.0.1:{ remap_base } " , address_remap = remap )
226
- ) as r :
227
- await r .initialize ()
228
- await r .set ("foo" , "foo" )
229
- await r .set ("bar" , "bar" )
230
-
231
- async def op (r ):
232
- with set_delay (delay ):
233
- return await r .get ("foo" )
234
-
235
- all_clear ()
236
- t = asyncio .create_task (op (r ))
237
- # Wait for whichever DelayProxy gets the request first
238
- await wait_for_send ()
239
- await asyncio .sleep (0.01 )
240
- t .cancel ()
241
- with pytest .raises (asyncio .CancelledError ):
242
- await t
243
-
244
- # try a number of requests to excercise all the connections
245
- async def doit ():
246
- assert await r .get ("bar" ) == b"bar"
247
- assert await r .ping ()
248
- assert await r .get ("foo" ) == b"foo"
249
-
250
- await asyncio .gather (* [doit () for _ in range (10 )])
251
-
252
- await asyncio .gather (* (p .stop () for p in proxies ))
228
+ async with contextlib .AsyncExitStack () as stack :
229
+ for p in proxies :
230
+ await stack .enter_async_context (p )
231
+
232
+ with contextlib .closing (
233
+ RedisCluster .from_url (
234
+ f"redis://127.0.0.1:{ remap_base } " , address_remap = remap
235
+ )
236
+ ) as r :
237
+ await r .initialize ()
238
+ await r .set ("foo" , "foo" )
239
+ await r .set ("bar" , "bar" )
240
+
241
+ async def op (r ):
242
+ with set_delay (delay ):
243
+ return await r .get ("foo" )
244
+
245
+ all_clear ()
246
+ t = asyncio .create_task (op (r ))
247
+ # Wait for whichever DelayProxy gets the request first
248
+ await wait_for_send ()
249
+ await asyncio .sleep (0.01 )
250
+ t .cancel ()
251
+ with pytest .raises (asyncio .CancelledError ):
252
+ await t
253
+
254
+ # try a number of requests to excercise all the connections
255
+ async def doit ():
256
+ assert await r .get ("bar" ) == b"bar"
257
+ assert await r .ping ()
258
+ assert await r .get ("foo" ) == b"foo"
259
+
260
+ await asyncio .gather (* [doit () for _ in range (10 )])
0 commit comments