Skip to content

Commit 03dc8c1

Browse files
committed
1 parent 4ecc787 commit 03dc8c1

File tree

4 files changed

+361
-132
lines changed

4 files changed

+361
-132
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,143 @@
1+
/*
2+
* Copyright 2021 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.kafka.listener;
18+
19+
import java.util.List;
20+
21+
import org.apache.kafka.clients.consumer.Consumer;
22+
import org.apache.kafka.clients.consumer.ConsumerRecord;
23+
import org.apache.kafka.clients.consumer.ConsumerRecords;
24+
import org.apache.kafka.common.errors.SerializationException;
25+
26+
import org.springframework.lang.Nullable;
27+
import org.springframework.util.backoff.BackOff;
28+
29+
/**
30+
* An error handler that, for record listeners, seeks to the current offset for each topic
31+
* in the remaining records. Used to rewind partitions after a message failure so that it
32+
* can be replayed. For batch listeners, seeks to the current offset for each topic in a
33+
* batch of records. Used to rewind partitions after a message failure so that the batch
34+
* can be replayed. If the listener throws a {@link BatchListenerFailedException}, with
35+
* the failed record. The records before the record will have their offsets committed and
36+
* the partitions for the remaining records will be repositioned and/or the failed record
37+
* can be recovered and skipped. If some other exception is thrown, or a valid record is
38+
* not provided in the exception, error handling is delegated to a
39+
* {@link SeekToCurrentBatchErrorHandler} with this handler's {@link BackOff}. If the
40+
* record is recovered, its offset is committed.
41+
*
42+
* @author Gary Russell
43+
*
44+
* @since 2.8
45+
*
46+
*/
47+
public class DefaultErrorHandler extends FailedRecordProcessor implements CommonErrorHandler {
48+
49+
private boolean ackAfterHandle = true;
50+
51+
/**
52+
* Construct an instance with the default recoverer which simply logs the record after
53+
* {@value SeekUtils#DEFAULT_MAX_FAILURES} (maxFailures) have occurred for a
54+
* topic/partition/offset, with the default back off (9 retries, no delay).
55+
*/
56+
public DefaultErrorHandler() {
57+
this(null, SeekUtils.DEFAULT_BACK_OFF);
58+
}
59+
60+
/**
61+
* Construct an instance with the default recoverer which simply logs the record after
62+
* the backOff returns STOP for a topic/partition/offset.
63+
* @param backOff the {@link BackOff}.
64+
*/
65+
public DefaultErrorHandler(BackOff backOff) {
66+
this(null, backOff);
67+
}
68+
69+
/**
70+
* Construct an instance with the provided recoverer which will be called after
71+
* {@value SeekUtils#DEFAULT_MAX_FAILURES} (maxFailures) have occurred for a
72+
* topic/partition/offset.
73+
* @param recoverer the recoverer.
74+
*/
75+
public DefaultErrorHandler(ConsumerRecordRecoverer recoverer) {
76+
this(recoverer, SeekUtils.DEFAULT_BACK_OFF);
77+
}
78+
79+
/**
80+
* Construct an instance with the provided recoverer which will be called after
81+
* the backOff returns STOP for a topic/partition/offset.
82+
* @param recoverer the recoverer; if null, the default (logging) recoverer is used.
83+
* @param backOff the {@link BackOff}.
84+
*/
85+
public DefaultErrorHandler(@Nullable ConsumerRecordRecoverer recoverer, BackOff backOff) {
86+
super(recoverer, backOff);
87+
}
88+
89+
/**
90+
* {@inheritDoc}
91+
* The container must be configured with
92+
* {@link org.springframework.kafka.listener.ContainerProperties.AckMode#MANUAL_IMMEDIATE}.
93+
* Whether or not the commit is sync or async depends on the container's syncCommits
94+
* property.
95+
* @param commitRecovered true to commit.
96+
*/
97+
@Override
98+
public void setCommitRecovered(boolean commitRecovered) { // NOSONAR enhanced javadoc
99+
super.setCommitRecovered(commitRecovered);
100+
}
101+
102+
@Override
103+
public boolean isAckAfterHandle() {
104+
return this.ackAfterHandle;
105+
}
106+
107+
@Override
108+
public void setAckAfterHandle(boolean ackAfterHandle) {
109+
this.ackAfterHandle = ackAfterHandle;
110+
}
111+
112+
@Override
113+
public void handleRemaining(Exception thrownException, List<ConsumerRecord<?, ?>> records,
114+
Consumer<?, ?> consumer, MessageListenerContainer container) {
115+
116+
SeekUtils.seekOrRecover(thrownException, records, consumer, container, isCommitRecovered(), // NOSONAR
117+
getRecoveryStrategy(records, thrownException), this.logger, getLogLevel());
118+
}
119+
120+
@Override
121+
public void handleBatch(Exception thrownException, ConsumerRecords<?, ?> data, Consumer<?, ?> consumer,
122+
MessageListenerContainer container, Runnable invokeListener) {
123+
124+
CommonErrorHandler.super.handleBatch(thrownException, data, consumer, container, invokeListener);
125+
}
126+
127+
@Override
128+
public void handleOtherException(Exception thrownException, Consumer<?, ?> consumer,
129+
MessageListenerContainer container) {
130+
131+
if (thrownException instanceof SerializationException) {
132+
throw new IllegalStateException("This error handler cannot process 'SerializationException's directly; "
133+
+ "please consider configuring an 'ErrorHandlingDeserializer' in the value and/or key "
134+
+ "deserializer", thrownException);
135+
}
136+
else {
137+
throw new IllegalStateException("This error handler cannot process '"
138+
+ thrownException.getClass().getName()
139+
+ "'s; no record information is available", thrownException);
140+
}
141+
}
142+
143+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,179 @@
1+
/*
2+
* Copyright 2021 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.kafka.listener;
18+
19+
import java.time.Duration;
20+
import java.util.ArrayList;
21+
import java.util.Collections;
22+
import java.util.HashMap;
23+
import java.util.HashSet;
24+
import java.util.Iterator;
25+
import java.util.List;
26+
import java.util.Map;
27+
import java.util.Set;
28+
import java.util.function.BiConsumer;
29+
30+
import org.apache.kafka.clients.consumer.Consumer;
31+
import org.apache.kafka.clients.consumer.ConsumerRecord;
32+
import org.apache.kafka.clients.consumer.ConsumerRecords;
33+
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
34+
import org.apache.kafka.clients.consumer.OffsetCommitCallback;
35+
import org.apache.kafka.common.TopicPartition;
36+
37+
import org.springframework.kafka.KafkaException;
38+
import org.springframework.lang.Nullable;
39+
import org.springframework.util.backoff.BackOff;
40+
41+
/**
42+
* @author Gary Russell
43+
* @since 2.7
44+
*
45+
*/
46+
public abstract class FailedBatchProcessor extends FailedRecordProcessor {
47+
48+
private static final LoggingCommitCallback LOGGING_COMMIT_CALLBACK = new LoggingCommitCallback();
49+
50+
private final CommonErrorHandler fallbackHandler;
51+
52+
/**
53+
* Construct an instance with the provided properties.
54+
* @param recoverer the recoverer.
55+
* @param backOff the back off.
56+
*/
57+
public FailedBatchProcessor(@Nullable
58+
BiConsumer<ConsumerRecord<?, ?>, Exception> recoverer, BackOff backOff, CommonErrorHandler fallbackHandler) {
59+
super(recoverer, backOff);
60+
this.fallbackHandler = fallbackHandler;
61+
}
62+
63+
protected void doHandle(Exception thrownException, ConsumerRecords<?, ?> data, Consumer<?, ?> consumer,
64+
MessageListenerContainer container, Runnable invokeListener) {
65+
66+
BatchListenerFailedException batchListenerFailedException = getBatchListenerFailedException(thrownException);
67+
if (batchListenerFailedException == null) {
68+
this.logger.debug(thrownException, "Expected a BatchListenerFailedException; re-seeking batch");
69+
this.fallbackHandler.handleBatch(thrownException, data, consumer, container, invokeListener);
70+
}
71+
else {
72+
ConsumerRecord<?, ?> record = batchListenerFailedException.getRecord();
73+
int index = record != null ? findIndex(data, record) : batchListenerFailedException.getIndex();
74+
if (index < 0 || index >= data.count()) {
75+
this.logger.warn(batchListenerFailedException, () ->
76+
String.format("Record not found in batch: %s-%d@%d; re-seeking batch",
77+
record.topic(), record.partition(), record.offset()));
78+
this.fallbackHandler.handleBatch(thrownException, data, consumer, container, invokeListener);
79+
}
80+
else {
81+
seekOrRecover(thrownException, data, consumer, container, index);
82+
}
83+
}
84+
}
85+
86+
private int findIndex(ConsumerRecords<?, ?> data, ConsumerRecord<?, ?> record) {
87+
if (record == null) {
88+
return -1;
89+
}
90+
int i = 0;
91+
Iterator<?> iterator = data.iterator();
92+
while (iterator.hasNext()) {
93+
ConsumerRecord<?, ?> candidate = (ConsumerRecord<?, ?>) iterator.next();
94+
if (candidate.topic().equals(record.topic()) && candidate.partition() == record.partition()
95+
&& candidate.offset() == record.offset()) {
96+
break;
97+
}
98+
i++;
99+
}
100+
return i;
101+
}
102+
103+
private void seekOrRecover(Exception thrownException, @Nullable ConsumerRecords<?, ?> data, Consumer<?, ?> consumer, MessageListenerContainer container, int indexArg) {
104+
105+
if (data == null) {
106+
return;
107+
}
108+
Iterator<?> iterator = data.iterator();
109+
List<ConsumerRecord<?, ?>> toCommit = new ArrayList<>();
110+
List<ConsumerRecord<?, ?>> remaining = new ArrayList<>();
111+
int index = indexArg;
112+
while (iterator.hasNext()) {
113+
ConsumerRecord<?, ?> record = (ConsumerRecord<?, ?>) iterator.next();
114+
if (index-- > 0) {
115+
toCommit.add(record);
116+
}
117+
else {
118+
remaining.add(record);
119+
}
120+
}
121+
Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>();
122+
toCommit.forEach(rec -> offsets.compute(new TopicPartition(rec.topic(), rec.partition()),
123+
(key, val) -> new OffsetAndMetadata(rec.offset() + 1)));
124+
if (offsets.size() > 0) {
125+
commit(consumer, container, offsets);
126+
}
127+
if (remaining.size() > 0) {
128+
SeekUtils.seekOrRecover(thrownException, remaining, consumer, container, false,
129+
getRecoveryStrategy(remaining, thrownException), this.logger, getLogLevel());
130+
ConsumerRecord<?, ?> recovered = remaining.get(0);
131+
commit(consumer, container,
132+
Collections.singletonMap(new TopicPartition(recovered.topic(), recovered.partition()),
133+
new OffsetAndMetadata(recovered.offset() + 1)));
134+
if (remaining.size() > 1) {
135+
throw new KafkaException("Seek to current after exception", getLogLevel(), thrownException);
136+
}
137+
}
138+
}
139+
140+
private void commit(Consumer<?, ?> consumer, MessageListenerContainer container, Map<TopicPartition, OffsetAndMetadata> offsets) {
141+
142+
boolean syncCommits = container.getContainerProperties().isSyncCommits();
143+
Duration timeout = container.getContainerProperties().getSyncCommitTimeout();
144+
if (syncCommits) {
145+
consumer.commitSync(offsets, timeout);
146+
}
147+
else {
148+
OffsetCommitCallback commitCallback = container.getContainerProperties().getCommitCallback();
149+
if (commitCallback == null) {
150+
commitCallback = LOGGING_COMMIT_CALLBACK;
151+
}
152+
consumer.commitAsync(offsets, commitCallback);
153+
}
154+
}
155+
156+
@Nullable
157+
private BatchListenerFailedException getBatchListenerFailedException(Throwable throwableArg) {
158+
if (throwableArg == null || throwableArg instanceof BatchListenerFailedException) {
159+
return (BatchListenerFailedException) throwableArg;
160+
}
161+
162+
BatchListenerFailedException target = null;
163+
164+
Throwable throwable = throwableArg;
165+
Set<Throwable> checked = new HashSet<>();
166+
while (throwable.getCause() != null && !checked.contains(throwable.getCause())) {
167+
throwable = throwable.getCause();
168+
checked.add(throwable);
169+
170+
if (throwable instanceof BatchListenerFailedException) {
171+
target = (BatchListenerFailedException) throwable;
172+
break;
173+
}
174+
}
175+
176+
return target;
177+
}
178+
179+
}

0 commit comments

Comments
 (0)