|
1 | 1 | /*
|
2 |
| - * Copyright 2017 the original author or authors. |
| 2 | + * Copyright 2017-2019 the original author or authors. |
3 | 3 | *
|
4 | 4 | * Licensed under the Apache License, Version 2.0 (the "License");
|
5 | 5 | * you may not use this file except in compliance with the License.
|
|
43 | 43 | import org.junit.Test;
|
44 | 44 | import org.mockito.Mockito;
|
45 | 45 |
|
| 46 | +import org.springframework.amqp.core.AcknowledgeMode; |
46 | 47 | import org.springframework.amqp.core.MessageListener;
|
47 | 48 | import org.springframework.amqp.rabbit.connection.ChannelProxy;
|
48 | 49 | import org.springframework.amqp.rabbit.connection.Connection;
|
@@ -175,7 +176,7 @@ else if (i.getArgument(0).equals(17L)) {
|
175 | 176 | }
|
176 | 177 | Thread.sleep(200);
|
177 | 178 | consumer.get().handleDelivery("consumerTag", envelope(16), props, body);
|
178 |
| - // should get 2 acks #10 and #6 (timeout) |
| 179 | + // should get 2 acks #10 and #16 (timeout) |
179 | 180 | assertTrue(latch2.await(10, TimeUnit.SECONDS));
|
180 | 181 | consumer.get().handleDelivery("consumerTag", envelope(17), props, body);
|
181 | 182 | verify(channel).basicAck(10L, true);
|
@@ -311,6 +312,56 @@ public void testMonitorCancelsAfterBadAckEvenIfChannelReportsOpen() throws Excep
|
311 | 312 | container.stop();
|
312 | 313 | }
|
313 | 314 |
|
| 315 | + @Test |
| 316 | + public void testMonitorCancelsAfterTargetChannelChanges() throws Exception { |
| 317 | + ConnectionFactory connectionFactory = mock(ConnectionFactory.class); |
| 318 | + Connection connection = mock(Connection.class); |
| 319 | + ChannelProxy channel = mock(ChannelProxy.class); |
| 320 | + Channel rabbitChannel1 = mock(Channel.class); |
| 321 | + Channel rabbitChannel2 = mock(Channel.class); |
| 322 | + AtomicReference<Channel> target = new AtomicReference<>(rabbitChannel1); |
| 323 | + willAnswer(inv -> { |
| 324 | + return target.get(); |
| 325 | + }).given(channel).getTargetChannel(); |
| 326 | + |
| 327 | + given(connectionFactory.createConnection()).willReturn(connection); |
| 328 | + given(connection.createChannel(anyBoolean())).willReturn(channel); |
| 329 | + given(channel.isOpen()).willReturn(true); |
| 330 | + given(channel.queueDeclarePassive(Mockito.anyString())) |
| 331 | + .willAnswer(invocation -> mock(AMQP.Queue.DeclareOk.class)); |
| 332 | + AtomicReference<Consumer> consumer = new AtomicReference<>(); |
| 333 | + final CountDownLatch latch1 = new CountDownLatch(1); |
| 334 | + final CountDownLatch latch2 = new CountDownLatch(1); |
| 335 | + willAnswer(inv -> { |
| 336 | + consumer.set(inv.getArgument(6)); |
| 337 | + latch1.countDown(); |
| 338 | + return "consumerTag"; |
| 339 | + }).given(channel).basicConsume(anyString(), anyBoolean(), anyString(), anyBoolean(), anyBoolean(), |
| 340 | + anyMap(), any(Consumer.class)); |
| 341 | + |
| 342 | + willAnswer(inv -> { |
| 343 | + consumer.get().handleCancelOk("consumerTag"); |
| 344 | + latch2.countDown(); |
| 345 | + return null; |
| 346 | + }).given(channel).basicCancel("consumerTag"); |
| 347 | + |
| 348 | + DirectMessageListenerContainer container = new DirectMessageListenerContainer(connectionFactory); |
| 349 | + container.setQueueNames("test"); |
| 350 | + container.setPrefetchCount(2); |
| 351 | + container.setMonitorInterval(100); |
| 352 | + container.setMessageListener(msg -> { |
| 353 | + target.set(rabbitChannel2); |
| 354 | + }); |
| 355 | + container.setAcknowledgeMode(AcknowledgeMode.MANUAL); |
| 356 | + container.afterPropertiesSet(); |
| 357 | + container.start(); |
| 358 | + |
| 359 | + assertTrue(latch1.await(10, TimeUnit.SECONDS)); |
| 360 | + consumer.get().handleDelivery("consumerTag", envelope(1L), new BasicProperties(), new byte[1]); |
| 361 | + assertTrue(latch2.await(10, TimeUnit.SECONDS)); |
| 362 | + container.stop(); |
| 363 | + } |
| 364 | + |
314 | 365 | private Envelope envelope(long tag) {
|
315 | 366 | return new Envelope(tag, false, "", "");
|
316 | 367 | }
|
|
0 commit comments