|
24 | 24 | import static org.mockito.Mockito.mock;
|
25 | 25 | import static org.mockito.Mockito.never;
|
26 | 26 |
|
| 27 | +import java.lang.reflect.Method; |
27 | 28 | import java.time.Clock;
|
28 | 29 | import java.util.List;
|
29 | 30 | import java.util.Map;
|
|
40 | 41 | import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;
|
41 | 42 | import org.springframework.kafka.listener.DeadLetterPublishingRecoverer;
|
42 | 43 | import org.springframework.kafka.listener.KafkaConsumerBackoffManager;
|
| 44 | +import org.springframework.kafka.listener.KafkaConsumerTimingAdjuster; |
43 | 45 | import org.springframework.kafka.listener.ListenerContainerRegistry;
|
44 | 46 | import org.springframework.kafka.listener.PartitionPausingBackOffManagerFactory;
|
45 | 47 | import org.springframework.kafka.support.converter.ConversionException;
|
| 48 | +import org.springframework.kafka.test.utils.KafkaTestUtils; |
46 | 49 | import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
|
47 | 50 | import org.springframework.util.backoff.BackOff;
|
48 | 51 |
|
@@ -213,6 +216,38 @@ protected RetryTopicComponentFactory createComponentFactory() {
|
213 | 216 | then(factory).should(never()).setTaskExecutor(taskExecutorMock);
|
214 | 217 | }
|
215 | 218 |
|
| 219 | + @Test |
| 220 | + void backOffManagerFactoryCoverage() throws Exception { |
| 221 | + Method create = PartitionPausingBackOffManagerFactory.class.getDeclaredMethod("doCreateManager", |
| 222 | + ListenerContainerRegistry.class); |
| 223 | + create.setAccessible(true); |
| 224 | + TaskExecutor te = mock(TaskExecutor.class); |
| 225 | + KafkaConsumerTimingAdjuster mock = mock(KafkaConsumerTimingAdjuster.class); |
| 226 | + PartitionPausingBackOffManagerFactory factory = new PartitionPausingBackOffManagerFactory(mock); |
| 227 | + assertThat(KafkaTestUtils.getPropertyValue(factory, "clock")).isEqualTo(Clock.systemUTC()); |
| 228 | + assertThat(KafkaTestUtils.getPropertyValue(factory, "timingAdjustmentManager")).isEqualTo(mock); |
| 229 | + create.invoke(factory, mock(ListenerContainerRegistry.class)); |
| 230 | + factory.setTaskExecutor(te); |
| 231 | + assertThat(KafkaTestUtils.getPropertyValue(factory, "taskExecutor")).isEqualTo(te); |
| 232 | + factory = new PartitionPausingBackOffManagerFactory(te); |
| 233 | + assertThat(KafkaTestUtils.getPropertyValue(factory, "clock")).isEqualTo(Clock.systemUTC()); |
| 234 | + assertThat(KafkaTestUtils.getPropertyValue(factory, "taskExecutor")).isEqualTo(te); |
| 235 | + create.invoke(factory, mock(ListenerContainerRegistry.class)); |
| 236 | + factory.setTimingAdjustmentManager(mock); |
| 237 | + assertThat(KafkaTestUtils.getPropertyValue(factory, "timingAdjustmentManager")).isEqualTo(mock); |
| 238 | + create.invoke(factory, mock(ListenerContainerRegistry.class)); |
| 239 | + factory = new PartitionPausingBackOffManagerFactory(false); |
| 240 | + assertThat(KafkaTestUtils.getPropertyValue(factory, "clock")).isEqualTo(Clock.systemUTC()); |
| 241 | + assertThat(KafkaTestUtils.getPropertyValue(factory, "timingAdjustmentEnabled")).isEqualTo(Boolean.FALSE); |
| 242 | + factory.setClock(Clock.systemDefaultZone()); |
| 243 | + assertThat(KafkaTestUtils.getPropertyValue(factory, "clock")).isEqualTo(Clock.systemDefaultZone()); |
| 244 | + factory = new PartitionPausingBackOffManagerFactory(Clock.systemDefaultZone()); |
| 245 | + assertThat(KafkaTestUtils.getPropertyValue(factory, "clock")).isEqualTo(Clock.systemDefaultZone()); |
| 246 | + assertThat(KafkaTestUtils.getPropertyValue(factory, "timingAdjustmentEnabled")).isEqualTo(Boolean.TRUE); |
| 247 | + factory.setTimingAdjustmentEnabled(false); |
| 248 | + assertThat(KafkaTestUtils.getPropertyValue(factory, "timingAdjustmentEnabled")).isEqualTo(Boolean.FALSE); |
| 249 | + } |
| 250 | + |
216 | 251 | @Test
|
217 | 252 | void testCreatesTaskExecutor() {
|
218 | 253 | RetryTopicConfigurationSupport support = new RetryTopicConfigurationSupport();
|
|
0 commit comments