Skip to content

Commit 2306071

Browse files
author
vooft
committed
Example of lost records using RetryingBatchErrorHandler
1 parent 3e3b3a6 commit 2306071

File tree

1 file changed

+91
-91
lines changed

1 file changed

+91
-91
lines changed

src/test/java/com/example/kafkaissue/KafkaIssueTest.java

+91-91
Original file line numberDiff line numberDiff line change
@@ -43,105 +43,105 @@
4343
@EmbeddedKafka(topics = {KafkaIssueTest.TOPIC})
4444
class KafkaIssueTest {
4545

46-
private static final Logger log = LoggerFactory.getLogger(KafkaIssueTest.class);
46+
private static final Logger log = LoggerFactory.getLogger(KafkaIssueTest.class);
4747

4848
public static final String TOPIC = "test-topic";
49-
public static final String GROUP = "test-group";
50-
51-
private static final Duration TIMEOUT = Duration.ofMillis(50);
52-
53-
private static final int TOTAL_RECORDS = 50;
54-
private static final Set<String> REMAINING = Collections.synchronizedSet(new HashSet<>());
55-
private static final AtomicBoolean SPRING_KAFKA_LISTENER_FAILURE_FLAG = new AtomicBoolean(true);
56-
57-
@Value("${spring.embedded.kafka.brokers}")
58-
private String brokerAddresses;
59-
60-
@Autowired
61-
private KafkaTemplate<String, String> kafkaTemplate;
62-
63-
@Configuration
64-
static class TestConfig {
65-
@KafkaListener(topics = TOPIC, groupId = GROUP, containerFactory = "testContainerFactory")
66-
public void listen(List<ConsumerRecord<String, String>> batch) {
67-
if (SPRING_KAFKA_LISTENER_FAILURE_FLAG.get()) {
68-
throw new RuntimeException("just failing");
69-
} else {
70-
for (ConsumerRecord<String, String> record : batch) {
71-
log.info("Processed by spring kafka listener {}", record.value());
72-
REMAINING.remove(record.value());
73-
}
74-
}
75-
}
76-
}
77-
78-
@Configuration
79-
static class TestContainerFactoryConfig {
80-
@Bean
81-
public ConcurrentKafkaListenerContainerFactory<String, Object> testContainerFactory(KafkaProperties kafkaProperties) {
82-
final var consumerFactory = new DefaultKafkaConsumerFactory<>(kafkaProperties.buildConsumerProperties());
83-
84-
final var containerFactory = new ConcurrentKafkaListenerContainerFactory<String, Object>();
85-
containerFactory.setConsumerFactory(consumerFactory);
86-
containerFactory.setBatchListener(true);
87-
88-
final var backoff = new FixedBackOff(250, Long.MAX_VALUE); // may need to reduce the backoff if not failing
89-
90-
final var errorHandler = new RetryingBatchErrorHandler(backoff, (ignore1, ignore2) -> {
91-
throw new RuntimeException("boo");
92-
});
93-
94-
// will work if replaced with SeekToCurrentBatchErrorHandler
49+
public static final String GROUP = "test-group";
50+
51+
private static final Duration TIMEOUT = Duration.ofMillis(50);
52+
53+
private static final int TOTAL_RECORDS = 50;
54+
private static final Set<String> REMAINING = Collections.synchronizedSet(new HashSet<>());
55+
private static final AtomicBoolean SPRING_KAFKA_LISTENER_FAILURE_FLAG = new AtomicBoolean(true);
56+
57+
@Value("${spring.embedded.kafka.brokers}")
58+
private String brokerAddresses;
59+
60+
@Autowired
61+
private KafkaTemplate<String, String> kafkaTemplate;
62+
63+
@Configuration
64+
static class TestConfig {
65+
@KafkaListener(topics = TOPIC, groupId = GROUP, containerFactory = "testContainerFactory")
66+
public void listen(List<ConsumerRecord<String, String>> batch) {
67+
if (SPRING_KAFKA_LISTENER_FAILURE_FLAG.get()) {
68+
throw new RuntimeException("just failing");
69+
} else {
70+
for (ConsumerRecord<String, String> record : batch) {
71+
log.info("Processed by spring kafka listener {}", record.value());
72+
REMAINING.remove(record.value());
73+
}
74+
}
75+
}
76+
}
77+
78+
@Configuration
79+
static class TestContainerFactoryConfig {
80+
@Bean
81+
public ConcurrentKafkaListenerContainerFactory<String, Object> testContainerFactory(KafkaProperties kafkaProperties) {
82+
final var consumerFactory = new DefaultKafkaConsumerFactory<>(kafkaProperties.buildConsumerProperties());
83+
84+
final var containerFactory = new ConcurrentKafkaListenerContainerFactory<String, Object>();
85+
containerFactory.setConsumerFactory(consumerFactory);
86+
containerFactory.setBatchListener(true);
87+
88+
final var backoff = new FixedBackOff(250, Long.MAX_VALUE); // may need to reduce the backoff if not failing
89+
90+
final var errorHandler = new RetryingBatchErrorHandler(backoff, (ignore1, ignore2) -> {
91+
throw new RuntimeException("boo");
92+
});
93+
94+
// will work if replaced with SeekToCurrentBatchErrorHandler
9595
// final var errorHandler = new SeekToCurrentBatchErrorHandler();
9696
// errorHandler.setBackOff(backoff);
97-
containerFactory.setBatchErrorHandler(errorHandler);
98-
return containerFactory;
99-
}
100-
}
97+
containerFactory.setBatchErrorHandler(errorHandler);
98+
return containerFactory;
99+
}
100+
}
101101

102102

103103
@Test
104104
void test() throws Exception {
105-
for (int i = 0; i < TOTAL_RECORDS; i++) {
106-
final var sendResult = kafkaTemplate.send(TOPIC, String.valueOf(i), String.valueOf(i)).get();
107-
log.info("Sent record {}", sendResult.getRecordMetadata());
108-
REMAINING.add(String.valueOf(i));
109-
}
110-
111-
Thread.sleep(1000);
112-
113-
// create second consumer to force rebalancing
114-
final var consumer = createConsumer();
115-
consumer.subscribe(List.of(TOPIC));
116-
117-
int maxEmpty = 30;
118-
while (maxEmpty > 0) {
119-
final var batch = consumer.poll(TIMEOUT);
120-
if (batch.isEmpty()) {
121-
maxEmpty--;
122-
}
123-
for (ConsumerRecord<String, String> record : batch) {
124-
log.info("Received by manual consumer {}", record.value());
125-
REMAINING.remove(record.value());
126-
}
127-
128-
Thread.sleep(TIMEOUT.toMillis());
129-
}
130-
131-
SPRING_KAFKA_LISTENER_FAILURE_FLAG.set(false);
132-
133-
await().atMost(Duration.ofSeconds(10)).until(() -> REMAINING, Set::isEmpty);
105+
for (int i = 0; i < TOTAL_RECORDS; i++) {
106+
final var sendResult = kafkaTemplate.send(TOPIC, String.valueOf(i), String.valueOf(i)).get();
107+
log.info("Sent record {}", sendResult.getRecordMetadata());
108+
REMAINING.add(String.valueOf(i));
109+
}
110+
111+
Thread.sleep(1000);
112+
113+
// create second consumer to force rebalancing
114+
final var consumer = createConsumer();
115+
consumer.subscribe(List.of(TOPIC));
116+
117+
int maxEmpty = 30;
118+
while (maxEmpty > 0) {
119+
final var batch = consumer.poll(TIMEOUT);
120+
if (batch.isEmpty()) {
121+
maxEmpty--;
122+
}
123+
for (ConsumerRecord<String, String> record : batch) {
124+
log.info("Received by manual consumer {}", record.value());
125+
REMAINING.remove(record.value());
126+
}
127+
128+
Thread.sleep(TIMEOUT.toMillis());
129+
}
130+
131+
SPRING_KAFKA_LISTENER_FAILURE_FLAG.set(false);
132+
133+
await().atMost(Duration.ofSeconds(10)).until(() -> REMAINING, Set::isEmpty);
134134
}
135135

136-
private KafkaConsumer<String, String> createConsumer() {
137-
final var props = new Properties();
138-
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerAddresses);
139-
props.put(ConsumerConfig.GROUP_ID_CONFIG, GROUP);
140-
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
141-
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
142-
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 1);
143-
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
144-
145-
return new KafkaConsumer<>(props);
146-
}
136+
private KafkaConsumer<String, String> createConsumer() {
137+
final var props = new Properties();
138+
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerAddresses);
139+
props.put(ConsumerConfig.GROUP_ID_CONFIG, GROUP);
140+
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
141+
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
142+
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 1);
143+
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
144+
145+
return new KafkaConsumer<>(props);
146+
}
147147
}

0 commit comments

Comments
 (0)