@@ -17,31 +17,12 @@ def redis_addr(request):
17
17
return host , int (port )
18
18
19
19
20
- async def pipe (
21
- reader : asyncio .StreamReader ,
22
- writer : asyncio .StreamWriter ,
23
- proxy : "DelayProxy" ,
24
- name = "" ,
25
- event : asyncio .Event = None ,
26
- ):
27
- while True :
28
- data = await reader .read (1000 )
29
- if not data :
30
- break
31
- if event :
32
- event .set ()
33
- await asyncio .sleep (proxy .delay )
34
- writer .write (data )
35
- await writer .drain ()
36
-
37
-
38
20
class DelayProxy :
39
21
def __init__ (self , addr , redis_addr , delay : float ):
40
22
self .addr = addr
41
23
self .redis_addr = redis_addr
42
24
self .delay = delay
43
25
self .send_event = asyncio .Event ()
44
- self .redis_streams = None
45
26
46
27
async def start (self ):
47
28
# test that we can connect to redis
@@ -52,31 +33,48 @@ async def start(self):
52
33
self .ROUTINE = asyncio .create_task (self .server .serve_forever ())
53
34
54
35
@contextlib .contextmanager
55
- def override (self , delay : float = 0.0 ):
36
+ def set_delay (self , delay : float = 0.0 ):
56
37
"""
57
38
Allow to override the delay for parts of tests which aren't time dependent,
58
39
to speed up execution.
59
40
"""
60
- old = self .delay
41
+ old_delay = self .delay
61
42
self .delay = delay
62
43
try :
63
44
yield
64
45
finally :
65
- self .delay = old
46
+ self .delay = old_delay
66
47
67
48
async def handle (self , reader , writer ):
68
49
# establish connection to redis
69
50
redis_reader , redis_writer = await asyncio .open_connection (* self .redis_addr )
70
51
try :
71
52
pipe1 = asyncio .create_task (
72
- pipe (reader , redis_writer , self , "to redis:" , self .send_event )
53
+ self . pipe (reader , redis_writer , "to redis:" , self .send_event )
73
54
)
74
- pipe2 = asyncio .create_task (pipe (redis_reader , writer , self , "from redis:" ))
55
+ pipe2 = asyncio .create_task (self . pipe (redis_reader , writer , "from redis:" ))
75
56
await asyncio .gather (pipe1 , pipe2 )
76
57
finally :
77
58
redis_writer .close ()
78
59
redis_reader .close ()
79
60
61
+ async def pipe (
62
+ self ,
63
+ reader : asyncio .StreamReader ,
64
+ writer : asyncio .StreamWriter ,
65
+ name = "" ,
66
+ event : asyncio .Event = None ,
67
+ ):
68
+ while True :
69
+ data = await reader .read (1000 )
70
+ if not data :
71
+ break
72
+ if event :
73
+ event .set ()
74
+ await asyncio .sleep (self .delay )
75
+ writer .write (data )
76
+ await writer .drain ()
77
+
80
78
async def stop (self ):
81
79
# clean up enough so that we can reuse the looper
82
80
self .ROUTINE .cancel ()
@@ -101,7 +99,7 @@ async def test_standalone(delay, redis_addr):
101
99
# note that we connect to proxy, rather than to Redis directly
102
100
async with Redis (host = "127.0.0.1" , port = 5380 , single_connection_client = b ) as r :
103
101
104
- with dp .override ( ):
102
+ with dp .set_delay ( 0 ):
105
103
await r .set ("foo" , "foo" )
106
104
await r .set ("bar" , "bar" )
107
105
@@ -117,7 +115,7 @@ async def test_standalone(delay, redis_addr):
117
115
118
116
# make sure that our previous request, cancelled while waiting for
119
117
# a repsponse, didn't leave the connection open andin a bad state
120
- with dp .override ( ):
118
+ with dp .set_delay ( 0 ):
121
119
assert await r .get ("bar" ) == b"bar"
122
120
assert await r .ping ()
123
121
assert await r .get ("foo" ) == b"foo"
@@ -132,7 +130,7 @@ async def test_standalone_pipeline(delay, redis_addr):
132
130
await dp .start ()
133
131
for b in [True , False ]:
134
132
async with Redis (host = "127.0.0.1" , port = 5380 , single_connection_client = b ) as r :
135
- with dp .override ( ):
133
+ with dp .set_delay ( 0 ):
136
134
await r .set ("foo" , "foo" )
137
135
await r .set ("bar" , "bar" )
138
136
@@ -154,7 +152,7 @@ async def test_standalone_pipeline(delay, redis_addr):
154
152
155
153
# we have now cancelled the pieline in the middle of a request, make sure
156
154
# that the connection is still usable
157
- with dp .override ( ):
155
+ with dp .set_delay ( 0 ):
158
156
pipe .get ("bar" )
159
157
pipe .ping ()
160
158
pipe .get ("foo" )
@@ -205,10 +203,10 @@ async def any_wait():
205
203
)
206
204
207
205
@contextlib .contextmanager
208
- def all_override (delay : int = 0 ):
206
+ def set_delay (delay : int = 0 ):
209
207
with contextlib .ExitStack () as stack :
210
208
for p in proxies :
211
- stack .enter_context (p .override ( delay = delay ))
209
+ stack .enter_context (p .delay_as ( delay ))
212
210
yield
213
211
214
212
# start proxies
@@ -222,9 +220,9 @@ def all_override(delay: int = 0):
222
220
await r .set ("bar" , "bar" )
223
221
224
222
all_clear ()
225
- with all_override (delay = delay ):
223
+ with set_delay (delay = delay ):
226
224
t = asyncio .create_task (r .get ("foo" ))
227
- # cannot wait on the send event, we don't know which node will be used
225
+ # One of the proxies will handle our request, wait for it to send
228
226
await any_wait ()
229
227
await asyncio .sleep (delay )
230
228
t .cancel ()
0 commit comments