Skip to content

Commit 8886769

Browse files
committed
fix review and fix test bug at DefaultAfterRollbackProcessorTests
* add @test to DefaultAfterRollbackProcessorTests.testNoEarlyExitBackOff * polish TransactionalContainerTests * fix bug Tests at DefaultAfterRollbackProcessorTests method `testNoEarlyExitBackOff` and testEarlyExitBackOff
1 parent 58528e0 commit 8886769

File tree

2 files changed

+22
-10
lines changed

2 files changed

+22
-10
lines changed

Diff for: spring-kafka/src/test/java/org/springframework/kafka/listener/DefaultAfterRollbackProcessorTests.java

+21-4
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2019-2023 the original author or authors.
2+
* Copyright 2019-2024 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -54,12 +54,12 @@
5454
* @author Gary Russell
5555
* @author Francois Rosiere
5656
* @author Wang Zhiyang
57+
*
5758
* @since 2.3.1
5859
*
5960
*/
6061
public class DefaultAfterRollbackProcessorTests {
6162

62-
@SuppressWarnings("deprecation")
6363
@Test
6464
void testClassifier() {
6565
AtomicReference<ConsumerRecord<?, ?>> recovered = new AtomicReference<>();
@@ -142,10 +142,27 @@ void testBackOffNoBatchRecover() {
142142
verify(backOff, times(3)).start();
143143
}
144144

145+
@Test
146+
void testEarlyExitBackOff() {
147+
DefaultAfterRollbackProcessor<String, String> processor = new DefaultAfterRollbackProcessor<>(
148+
new FixedBackOff(10_000, 1));
149+
@SuppressWarnings("unchecked")
150+
Consumer<String, String> consumer = mock(Consumer.class);
151+
ConsumerRecord<String, String> record1 = new ConsumerRecord<>("foo", 0, 0L, "foo", "bar");
152+
ConsumerRecord<String, String> record2 = new ConsumerRecord<>("foo", 1, 1L, "foo", "bar");
153+
List<ConsumerRecord<String, String>> records = Arrays.asList(record1, record2);
154+
IllegalStateException illegalState = new IllegalStateException();
155+
MessageListenerContainer container = mock(MessageListenerContainer.class);
156+
given(container.isRunning()).willReturn(false);
157+
long t1 = System.currentTimeMillis();
158+
processor.process(records, consumer, container, illegalState, true, EOSMode.V2);
159+
assertThat(System.currentTimeMillis() < t1 + 5_000).isTrue();
160+
}
161+
145162
@Test
146163
void testNoEarlyExitBackOff() {
147164
DefaultAfterRollbackProcessor<String, String> processor = new DefaultAfterRollbackProcessor<>(
148-
new FixedBackOff(1, 200));
165+
new FixedBackOff(200, 1));
149166
@SuppressWarnings("unchecked")
150167
Consumer<String, String> consumer = mock(Consumer.class);
151168
ConsumerRecord<String, String> record1 = new ConsumerRecord<>("foo", 0, 0L, "foo", "bar");
@@ -156,7 +173,7 @@ void testNoEarlyExitBackOff() {
156173
given(container.isRunning()).willReturn(true);
157174
long t1 = System.currentTimeMillis();
158175
processor.process(records, consumer, container, illegalState, true, EOSMode.V2);
159-
assertThat(System.currentTimeMillis() >= t1 + 200);
176+
assertThat(System.currentTimeMillis() >= t1 + 200).isTrue();
160177
}
161178

162179
}

Diff for: spring-kafka/src/test/java/org/springframework/kafka/listener/TransactionalContainerTests.java

+1-6
Original file line numberDiff line numberDiff line change
@@ -788,8 +788,6 @@ public void accept(ConsumerRecord<?, ?> record, Consumer<?, ?> consumer, Excepti
788788
@SuppressWarnings({ "unchecked"})
789789
@Test
790790
public void testBatchListenerMaxFailuresOnRecover() throws Exception {
791-
792-
logger.info("Start testBatchListenerMaxFailures");
793791
String group = "groupInARBP2";
794792
Map<String, Object> props = KafkaTestUtils.consumerProps(group, "false", embeddedKafka);
795793
props.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed");
@@ -904,7 +902,6 @@ public void accept(ConsumerRecord<?, ?> record, Consumer<?, ?> consumer, Excepti
904902
verify(dlTemplate).sendOffsetsToTransaction(
905903
eq(Collections.singletonMap(new TopicPartition(topic8, 0), new OffsetAndMetadata(4L))),
906904
any(ConsumerGroupMetadata.class));
907-
logger.info("Stop testBatchListenerMaxFailures");
908905
}
909906

910907
@SuppressWarnings("unchecked")
@@ -973,7 +970,6 @@ public void testRollbackProcessorCrash() throws Exception {
973970
@SuppressWarnings("unchecked")
974971
@Test
975972
public void testBatchListenerRecoverAfterRollbackProcessorCrash() throws Exception {
976-
logger.info("Start testBatchListenerRollbackNoRetries");
977973
Map<String, Object> props = KafkaTestUtils.consumerProps("testBatchListenerRollbackNoRetries", "false", embeddedKafka);
978974
props.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed");
979975
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 2);
@@ -1041,10 +1037,9 @@ public void accept(ConsumerRecord<?, ?> record, Consumer<?, ?> consumer, Excepti
10411037
container.stop();
10421038
pf.destroy();
10431039
assertThat(stopLatch.await(10, TimeUnit.SECONDS)).isTrue();
1044-
logger.info("Stop testRollbackNoRetries");
10451040
}
10461041

1047-
@SuppressWarnings({ "rawtypes", "unchecked", "deprecation" })
1042+
@SuppressWarnings({ "rawtypes", "unchecked" })
10481043
@Test
10491044
void testNoAfterRollbackWhenFenced() throws Exception {
10501045
Consumer consumer = mock(Consumer.class);

0 commit comments

Comments
 (0)