Skip to content

GH-2588: ARBP support batch recoverable #2888

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
Feb 5, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -451,7 +451,10 @@ AfterRollbackProcessor<String, String> processor =
When you do not use transactions, you can achieve similar functionality by configuring a `DefaultErrorHandler`.
See xref:kafka/annotation-error-handling.adoc#error-handlers[Container Error Handlers].

IMPORTANT: Recovery is not possible with a batch listener, since the framework has no knowledge about which record in the batch keeps failing.
Starting with version 3.2, Recovery can now recover (skip) entire batch of records that keeps failing.
Set `ContainerProperties.setBatchRecoverAfterRollback(true)` to enable this feature.

IMPORTANT: Default behavior, recovery is not possible with a batch listener, since the framework has no knowledge about which record in the batch keeps failing.
In such cases, the application listener must handle a record that keeps failing.

See also xref:kafka/annotation-error-handling.adoc#dead-letters[Publishing Dead-letter Records].
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,4 +26,10 @@ See xref:kafka/receiving-messages/async-returns.adoc[Async Returns] for more inf
It's now possible to redirect messages to the custom DLTs based on the type of the exception, which has been thrown during the message processing.
Rules for the redirection are set either via the `RetryableTopic.exceptionBasedDltRouting` or the `RetryTopicConfigurationBuilder.dltRoutingRules`.
Custom DLTs are created automatically as well as other retry and dead-letter topics.
See xref:retrytopic/features.adoc#exc-based-custom-dlt-routing[Routing of messages to custom DLTs based on thrown exceptions] for more information.
See xref:retrytopic/features.adoc#exc-based-custom-dlt-routing[Routing of messages to custom DLTs based on thrown exceptions] for more information.

[[x32-after-rollback-processing]]
=== After Rollback Processing

A new `AfterRollbackProcessor` API `processBatch` is provided.
See xref:kafka/annotation-error-handling.adoc#after-rollback[After-rollback Processor] for more information.
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2018-2021 the original author or authors.
* Copyright 2018-2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -20,6 +20,7 @@

import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;

import org.springframework.kafka.listener.ContainerProperties.EOSMode;

Expand All @@ -34,6 +35,7 @@
* @param <V> the value type.
*
* @author Gary Russell
* @author Wang Zhiyang
*
* @since 1.3.5
*
Expand Down Expand Up @@ -63,6 +65,26 @@ public interface AfterRollbackProcessor<K, V> {
void process(List<ConsumerRecord<K, V>> records, Consumer<K, V> consumer,
MessageListenerContainer container, Exception exception, boolean recoverable, EOSMode eosMode);

/**
* Process the entire batch of records.
* Recoverable will be true if the container is processing entire batch of records;
* @param records the records.
* @param recordList the record list.
* @param consumer the consumer.
* @param container the container.
* @param exception the exception
* @param recoverable the recoverable.
* @param eosMode the {@link EOSMode}.
* @since 3.2
* @see #isProcessInTransaction()
*/
default void processBatch(ConsumerRecords<K, V> records, List<ConsumerRecord<K, V>> recordList,
Consumer<K, V> consumer, MessageListenerContainer container, Exception exception,
boolean recoverable, ContainerProperties.EOSMode eosMode) {

process(recordList, consumer, container, exception, recoverable, eosMode);
}

/**
* Optional method to clear thread state; will be called just before a consumer
* thread terminates.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
* failed.
*
* @author Gary Russell
* @author Wang Zhiyang
* @since 2.5
*
*/
Expand Down Expand Up @@ -98,9 +99,9 @@ public int getIndex() {

@Override
public String getMessage() {
return super.getMessage() + (this.record != null
return super.getMessage() + " " + (this.record != null
? (this.record.topic() + "-" + this.record.partition() + "@" + this.record.offset())
: (" @-" + this.index));
: ("@-" + this.index));
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@
* @author Johnny Lim
* @author Lukasz Kaminski
* @author Kyuhyeok Park
* @author Wang Zhiyang
*/
public class ContainerProperties extends ConsumerProperties {

Expand Down Expand Up @@ -258,6 +259,8 @@ public enum EOSMode {

private PlatformTransactionManager transactionManager;

private boolean batchRecoverAfterRollback = false;

private int monitorInterval = DEFAULT_MONITOR_INTERVAL;

private TaskScheduler scheduler;
Expand Down Expand Up @@ -543,6 +546,24 @@ public void setTransactionManager(@Nullable PlatformTransactionManager transacti
this.transactionManager = transactionManager;
}

/**
* Recover batch records after rollback if true.
* @return true to recover.
* @since 3.2
*/
public boolean isBatchRecoverAfterRollback() {
return this.batchRecoverAfterRollback;
}

/**
* enable the batch recover after rollback.
* @param batchRecoverAfterRollback the batchRecoverAfterRollback to set.
* @since 3.2
*/
public void setBatchRecoverAfterRollback(boolean batchRecoverAfterRollback) {
this.batchRecoverAfterRollback = batchRecoverAfterRollback;
}

public int getMonitorInterval() {
return this.monitorInterval;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2018-2023 the original author or authors.
* Copyright 2018-2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -17,16 +17,19 @@
package org.springframework.kafka.listener;

import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.BiConsumer;

import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;

import org.springframework.kafka.KafkaException;
import org.springframework.kafka.core.KafkaOperations;
import org.springframework.kafka.listener.ContainerProperties.EOSMode;
import org.springframework.lang.Nullable;
Expand All @@ -47,6 +50,7 @@
*
* @author Gary Russell
* @author Francois Rosiere
* @author Wang Zhiyang
*
* @since 1.3.5
*
Expand All @@ -60,7 +64,9 @@ public class DefaultAfterRollbackProcessor<K, V> extends FailedRecordProcessor

private final BackOff backOff;

private KafkaOperations<?, ?> kafkaTemplate;
private final KafkaOperations<?, ?> kafkaTemplate;

private final BiConsumer<ConsumerRecords<?, ?>, Exception> recoverer;

/**
* Construct an instance with the default recoverer which simply logs the record after
Expand Down Expand Up @@ -143,6 +149,11 @@ public DefaultAfterRollbackProcessor(@Nullable BiConsumer<ConsumerRecord<?, ?>,
super.setCommitRecovered(commitRecovered);
checkConfig();
this.backOff = backOff;
this.recoverer = (crs, ex) -> {
if (recoverer != null && !crs.isEmpty()) {
crs.spliterator().forEachRemaining(rec -> recoverer.accept(rec, ex));
}
};
}

private void checkConfig() {
Expand Down Expand Up @@ -176,6 +187,53 @@ && isCommitRecovered() && this.kafkaTemplate.isTransactional()) {

}

@SuppressWarnings({ "unchecked", "rawtypes"})
@Override
public void processBatch(ConsumerRecords<K, V> records, List<ConsumerRecord<K, V>> recordList, Consumer<K, V> consumer,
@Nullable MessageListenerContainer container, Exception exception, boolean recoverable, EOSMode eosMode) {

if (recoverable && isCommitRecovered()) {
long nextBackOff = ListenerUtils.nextBackOff(this.backOff, this.backOffs);
if (nextBackOff != BackOffExecution.STOP) {
SeekUtils.doSeeksToBegin((List) recordList, consumer, this.logger);
try {
ListenerUtils.stoppableSleep(container, nextBackOff);
}
catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
return;
}

try {
this.recoverer.accept(records, exception);
Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>();
records.forEach(rec -> offsets.put(new TopicPartition(rec.topic(), rec.partition()),
ListenerUtils.createOffsetAndMetadata(container, rec.offset() + 1)));
if (offsets.size() > 0 && this.kafkaTemplate != null && this.kafkaTemplate.isTransactional()) {
this.kafkaTemplate.sendOffsetsToTransaction(offsets, consumer.groupMetadata());
}
clearThreadState();
}
catch (Exception ex) {
SeekUtils.doSeeksToBegin((List) recordList, consumer, this.logger);
logger.error(ex, "Recoverer threw an exception; re-seeking batch");
throw ex;
}
return;
}

try {
process(recordList, consumer, container, exception, false, eosMode);
}
catch (KafkaException ke) {
ke.selfLog("AfterRollbackProcessor threw an exception", this.logger);
}
catch (Exception ex) {
this.logger.error(ex, "AfterRollbackProcessor threw an exception");
}
}

@Override
public boolean isProcessInTransaction() {
return isCommitRecovered();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Objects;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
Expand Down Expand Up @@ -662,6 +663,8 @@ private final class ListenerConsumer implements SchedulingAwareRunnable, Consume

private final boolean wantsFullRecords;

private final boolean wantsBatchRecoverAfterRollback;

private final boolean asyncReplies;

private final boolean autoCommit;
Expand Down Expand Up @@ -888,6 +891,7 @@ private final class ListenerConsumer implements SchedulingAwareRunnable, Consume

this.clientId = determineClientId();
this.transactionTemplate = determineTransactionTemplate();
this.wantsBatchRecoverAfterRollback = this.containerProperties.isBatchRecoverAfterRollback();
this.genericListener = listener;
this.consumerSeekAwareListener = checkConsumerSeekAware(listener);
this.commitCurrentOnAssignment = determineCommitCurrent(consumerProperties,
Expand Down Expand Up @@ -2195,38 +2199,26 @@ private void batchRollback(final ConsumerRecords<K, V> records,

@Override
protected void doInTransactionWithoutResult(TransactionStatus status) {
batchAfterRollback(records, recordList, e, afterRollbackProcessorToUse);
afterRollbackProcessorToUse.processBatch(records,
Objects.requireNonNullElseGet(recordList, () -> createRecordList(records)),
ListenerConsumer.this.consumer,
KafkaMessageListenerContainer.this.thisOrParentContainer, e,
ListenerConsumer.this.wantsBatchRecoverAfterRollback, ListenerConsumer.this.eosMode);
}

});
}
else {
batchAfterRollback(records, recordList, e, afterRollbackProcessorToUse);
}
}

private void batchAfterRollback(final ConsumerRecords<K, V> records,
@Nullable final List<ConsumerRecord<K, V>> recordList, RuntimeException rollbackException,
AfterRollbackProcessor<K, V> afterRollbackProcessorToUse) {

try {
if (recordList == null) {
afterRollbackProcessorToUse.process(createRecordList(records), this.consumer,
KafkaMessageListenerContainer.this.thisOrParentContainer, rollbackException, false,
this.eosMode);
try {
afterRollbackProcessorToUse.processBatch(records,
Objects.requireNonNullElseGet(recordList, () -> createRecordList(records)), this.consumer,
KafkaMessageListenerContainer.this.thisOrParentContainer, e,
this.wantsBatchRecoverAfterRollback, this.eosMode);
}
else {
afterRollbackProcessorToUse.process(recordList, this.consumer,
KafkaMessageListenerContainer.this.thisOrParentContainer, rollbackException, false,
this.eosMode);
catch (Exception ex) {
this.logger.error(ex, "AfterRollbackProcessor threw exception");
}
}
catch (KafkaException ke) {
ke.selfLog("AfterRollbackProcessor threw an exception", this.logger);
}
catch (Exception ex) {
this.logger.error(ex, "AfterRollbackProcessor threw an exception");
}
}

private List<ConsumerRecord<K, V>> createRecordList(final ConsumerRecords<K, V> records) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2017-2023 the original author or authors.
* Copyright 2017-2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -31,6 +31,7 @@
* @author Gary Russell
* @author Francois Rosiere
* @author Antonio Tomac
* @author Wang Zhiyang
* @since 2.0
*
*/
Expand Down Expand Up @@ -126,12 +127,7 @@ public static void unrecoverableBackOff(BackOff backOff, Map<Thread, BackOffExec
Map<Thread, Long> lastIntervals, MessageListenerContainer container) throws InterruptedException {

Thread currentThread = Thread.currentThread();
BackOffExecution backOffExecution = executions.get(currentThread);
if (backOffExecution == null) {
backOffExecution = backOff.start();
executions.put(currentThread, backOffExecution);
}
Long interval = backOffExecution.nextBackOff();
Long interval = nextBackOff(backOff, executions);
if (interval == BackOffExecution.STOP) {
interval = lastIntervals.get(currentThread);
if (interval == null) {
Expand All @@ -144,6 +140,17 @@ public static void unrecoverableBackOff(BackOff backOff, Map<Thread, BackOffExec
}
}

static long nextBackOff(BackOff backOff, Map<Thread, BackOffExecution> executions) {

Thread currentThread = Thread.currentThread();
BackOffExecution backOffExecution = executions.get(currentThread);
if (backOffExecution == null) {
backOffExecution = backOff.start();
executions.put(currentThread, backOffExecution);
}
return backOffExecution.nextBackOff();
}

/**
* Sleep for the desired timeout, as long as the container continues to run.
* @param container the container.
Expand Down
Loading