@@ -167,34 +167,66 @@ async def test_standalone_pipeline(delay, redis_addr):
167
167
@pytest .mark .onlycluster
168
168
async def test_cluster (request , redis_addr ):
169
169
170
- # TODO: This test actually doesn't work. Once the RedisCluster initializes,
171
- # it will re-connect to the nodes as advertised by the cluster, bypassing
172
- # the single DelayProxy we set up.
173
- # to work around this, we really would nedd a port-remapper for the RedisCluster
170
+ delay = 0.1
171
+ cluster_port = 6372
172
+ remap_base = 7372
173
+ n_nodes = 6
174
+
175
+ remap = []
176
+ proxies = []
177
+ for i in range (n_nodes ):
178
+ port = cluster_port + i
179
+ remapped = remap_base + i
180
+ remap .append ({"from_port" : port , "to_port" : remapped })
181
+ forward_addr = redis_addr [0 ], port
182
+ proxy = DelayProxy (
183
+ addr = ("127.0.0.1" , remapped ), redis_addr = forward_addr , delay = delay
184
+ )
185
+ proxies .append (proxy )
174
186
175
- redis_addr = redis_addr [0 ], 6372 # use the cluster port
176
- dp = DelayProxy (addr = ("127.0.0.1" , 5381 ), redis_addr = redis_addr , delay = 0.1 )
177
- await dp .start ()
187
+ # start proxies
188
+ await asyncio .gather (* [p .start () for p in proxies ])
189
+
190
+ def all_clear ():
191
+ for p in proxies :
192
+ p .send_event .clear ()
178
193
179
- r = RedisCluster .from_url ("redis://127.0.0.1:5381" )
180
- await r .initialize ()
181
- with dp .override ():
194
+ async def wait_for_send ():
195
+ asyncio .wait (
196
+ [p .send_event .wait () for p in proxies ], return_when = asyncio .FIRST_COMPLETED
197
+ )
198
+
199
+ @contextlib .contextmanager
200
+ def override ():
201
+ with contextlib .ExitStack () as stack :
202
+ for p in proxies :
203
+ stack .enter_context (p .override ())
204
+ yield
205
+
206
+ with override ():
207
+ r = RedisCluster .from_url (
208
+ f"redis://127.0.0.1:{ remap_base } " , host_port_remap = remap
209
+ )
210
+ await r .initialize ()
182
211
await r .set ("foo" , "foo" )
183
212
await r .set ("bar" , "bar" )
184
213
185
- dp . send_event . clear ()
214
+ all_clear ()
186
215
t = asyncio .create_task (r .get ("foo" ))
187
- # await dp.send_event.wait() # won"t work, because DelayProxy is by-passed
188
- await asyncio .sleep (0.05 )
216
+ # cannot wait on the send event, we don't know which node will be used
217
+ await wait_for_send ()
218
+ await asyncio .sleep (delay )
189
219
t .cancel ()
190
- try :
220
+ with pytest . raises ( asyncio . CancelledError ) :
191
221
await t
192
- except asyncio .CancelledError :
193
- pass
194
222
195
- with dp .override ():
196
- assert await r .get ("bar" ) == b"bar"
197
- assert await r .ping ()
198
- assert await r .get ("foo" ) == b"foo"
223
+ with override ():
224
+ # try a number of requests to excercise all the connections
225
+ async def doit ():
226
+ assert await r .get ("bar" ) == b"bar"
227
+ assert await r .ping ()
228
+ assert await r .get ("foo" ) == b"foo"
199
229
200
- await dp .stop ()
230
+ await asyncio .gather (* [doit () for _ in range (10 )])
231
+
232
+ await asyncio .gather (* (p .stop () for p in proxies ))
0 commit comments