Skip to content

Commit d1d7968

Browse files
committed
spring-projectsGH-615: Error Handler Evolution Part 1
Flatten `GenericErrorHandler` into a single interface. Create an adapter for legacy error handlers. See spring-projects#615
1 parent 2cab170 commit d1d7968

File tree

3 files changed

+329
-28
lines changed

3 files changed

+329
-28
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,143 @@
1+
/*
2+
* Copyright 2021 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.kafka.listener;
18+
19+
import java.util.List;
20+
21+
import org.apache.kafka.clients.consumer.Consumer;
22+
import org.apache.kafka.clients.consumer.ConsumerRecord;
23+
import org.apache.kafka.clients.consumer.ConsumerRecords;
24+
25+
import org.springframework.kafka.support.TopicPartitionOffset;
26+
27+
/**
28+
* Replacement for {@link ErrorHandler} and {@link BatchErrorHandler} and their
29+
* sub-interfaces.
30+
*
31+
* @author Gary Russell
32+
* @since 2.7.4
33+
*
34+
*/
35+
public interface CommonErrorHandler extends DeliveryAttemptAware {
36+
37+
/**
38+
* Return true if this error handler is for a batch listener.
39+
* @return true for batch.
40+
*/
41+
default boolean isBatch() {
42+
return false;
43+
}
44+
45+
/**
46+
* Return false if this error handler should only receive the current failed record;
47+
* remaining records will be passed to the listener after the error handler returns.
48+
* When true (default), all remaining records including the failed record are passed
49+
* to the error handler.
50+
* @return false to receive only the failed record.
51+
* @see #handle(Exception, ConsumerRecord, Consumer, MessageListenerContainer)
52+
* @see #handle(Exception, List, Consumer, MessageListenerContainer)
53+
*/
54+
default boolean remainingRecords() {
55+
return true;
56+
}
57+
58+
/**
59+
* Return true if this error handler supports delivery attempts headers.
60+
* @return true if capable.
61+
*/
62+
default boolean deliveryAttemptHeader() {
63+
return false;
64+
}
65+
66+
/**
67+
* Called when an exception is thrown with no records available, e.g. if the consumer
68+
* poll throws an exception.
69+
* @param thrownException the exception.
70+
* @param consumer the consumer.
71+
* @param container the container.
72+
*/
73+
default void handleConsumerException(Exception thrownException, Consumer<?, ?> consumer,
74+
MessageListenerContainer container) {
75+
}
76+
77+
/**
78+
* Handle the exception for a record listener when {@link #remainingRecords()} returns false.
79+
* @param thrownException the exception.
80+
* @param record the record.
81+
* @param consumer the consumer.
82+
* @param container the container.
83+
* @see #remainingRecords()
84+
*/
85+
default void handle(Exception thrownException, ConsumerRecord<?, ?> record, Consumer<?, ?> consumer,
86+
MessageListenerContainer container) {
87+
}
88+
89+
/**
90+
* Handle the exception for a record listener when {@link #remainingRecords()} returns true.
91+
* @param thrownException the exception.
92+
* @param records the remaining records including the one that failed.
93+
* @param consumer the consumer.
94+
* @param container the container.
95+
* @see #remainingRecords()
96+
*/
97+
default void handle(Exception thrownException, List<ConsumerRecord<?, ?>> records, Consumer<?, ?> consumer,
98+
MessageListenerContainer container) {
99+
}
100+
101+
/**
102+
* Handle the exception for a batch listener.
103+
* @param thrownException the exception.
104+
* @param data the consumer records.
105+
* @param consumer the consumer.
106+
* @param container the container.
107+
* @param invokeListener a callback to re-invoke the listener.
108+
*/
109+
default void handle(Exception thrownException, ConsumerRecords<?, ?> data,
110+
Consumer<?, ?> consumer, MessageListenerContainer container, Runnable invokeListener) {
111+
}
112+
113+
@Override
114+
default int deliveryAttempt(TopicPartitionOffset topicPartitionOffset) {
115+
return 0;
116+
}
117+
118+
/**
119+
* Optional method to clear thread state; will be called just before a consumer
120+
* thread terminates.
121+
*/
122+
default void clearThreadState() {
123+
}
124+
125+
/**
126+
* Return true if the offset should be committed for a handled error (no exception
127+
* thrown).
128+
* @return true to commit.
129+
*/
130+
default boolean isAckAfterHandle() {
131+
return true;
132+
}
133+
134+
/**
135+
* Set to false to prevent the container from committing the offset of a recovered
136+
* record (when the error handler does not itself throw an exception).
137+
* @param ack false to not commit.
138+
*/
139+
default void setAckAfterHandle(boolean ack) {
140+
throw new UnsupportedOperationException("This error handler does not support setting this property");
141+
}
142+
143+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,150 @@
1+
/*
2+
* Copyright 2021 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.kafka.listener;
18+
19+
import java.util.Collections;
20+
import java.util.List;
21+
22+
import org.apache.kafka.clients.consumer.Consumer;
23+
import org.apache.kafka.clients.consumer.ConsumerRecord;
24+
import org.apache.kafka.clients.consumer.ConsumerRecords;
25+
26+
import org.springframework.kafka.support.TopicPartitionOffset;
27+
import org.springframework.util.Assert;
28+
29+
/**
30+
* Adapts a legacy {@link ErrorHandler} or {@link BatchErrorHandler}.
31+
*
32+
* @author Gary Russell
33+
* @since 2.7.4
34+
*
35+
*/
36+
public class ErrorHandlerAdapter implements CommonErrorHandler {
37+
38+
@SuppressWarnings({ "rawtypes", "unchecked" })
39+
private static final ConsumerRecords EMPTY_BATCH = new ConsumerRecords(Collections.emptyMap());
40+
41+
private final ErrorHandler errorHandler;
42+
43+
private final BatchErrorHandler batchErrorHandler;
44+
45+
/**
46+
* Adapt an {@link ErrorHandler}.
47+
* @param errorHandler the handler.
48+
*/
49+
public ErrorHandlerAdapter(ErrorHandler errorHandler) {
50+
Assert.notNull(errorHandler, "'errorHandler' cannot be null");
51+
this.errorHandler = errorHandler;
52+
this.batchErrorHandler = null;
53+
}
54+
55+
/**
56+
* Adapt a {@link BatchErrorHandler}.
57+
* @param batchErrorHandler the handler.
58+
*/
59+
public ErrorHandlerAdapter(BatchErrorHandler batchErrorHandler) {
60+
Assert.notNull(batchErrorHandler, "'batchErrorHandler' cannot be null");
61+
this.errorHandler = null;
62+
this.batchErrorHandler = batchErrorHandler;
63+
}
64+
65+
@Override
66+
public boolean isBatch() {
67+
return this.batchErrorHandler != null;
68+
}
69+
70+
@Override
71+
public boolean remainingRecords() {
72+
return this.errorHandler instanceof RemainingRecordsErrorHandler;
73+
}
74+
75+
@Override
76+
public boolean deliveryAttemptHeader() {
77+
return this.errorHandler instanceof DeliveryAttemptAware;
78+
}
79+
80+
@Override
81+
public void clearThreadState() {
82+
if (this.errorHandler != null) {
83+
this.errorHandler.clearThreadState();
84+
}
85+
else {
86+
this.batchErrorHandler.clearThreadState();
87+
}
88+
}
89+
90+
@Override
91+
public boolean isAckAfterHandle() {
92+
if (this.errorHandler != null) {
93+
return this.errorHandler.isAckAfterHandle();
94+
}
95+
else {
96+
return this.batchErrorHandler.isAckAfterHandle();
97+
}
98+
}
99+
100+
@Override
101+
public void setAckAfterHandle(boolean ack) {
102+
if (this.errorHandler != null) {
103+
this.errorHandler.setAckAfterHandle(ack);
104+
}
105+
else {
106+
this.batchErrorHandler.setAckAfterHandle(ack);
107+
}
108+
}
109+
110+
@Override
111+
public int deliveryAttempt(TopicPartitionOffset topicPartitionOffset) {
112+
return ((DeliveryAttemptAware) this.errorHandler).deliveryAttempt(topicPartitionOffset);
113+
}
114+
115+
@SuppressWarnings({ "unchecked", "rawtypes" })
116+
@Override
117+
public void handleConsumerException(Exception thrownException, Consumer<?, ?> consumer,
118+
MessageListenerContainer container) {
119+
120+
if (this.errorHandler != null) {
121+
this.errorHandler.handle(thrownException, null, consumer, container);
122+
}
123+
else {
124+
this.batchErrorHandler.handle(thrownException, EMPTY_BATCH, consumer, container);
125+
}
126+
}
127+
128+
@Override
129+
public void handle(Exception thrownException, ConsumerRecord<?, ?> record, Consumer<?, ?> consumer,
130+
MessageListenerContainer container) {
131+
132+
this.errorHandler.handle(thrownException, record, consumer);
133+
}
134+
135+
@Override
136+
public void handle(Exception thrownException, List<ConsumerRecord<?, ?>> records, Consumer<?, ?> consumer,
137+
MessageListenerContainer container) {
138+
139+
this.errorHandler.handle(thrownException, records, consumer, container);
140+
}
141+
142+
@Override
143+
public void handle(Exception thrownException, ConsumerRecords<?, ?> data, Consumer<?, ?> consumer,
144+
MessageListenerContainer container, Runnable invokeListener) {
145+
146+
this.batchErrorHandler.handle(thrownException, data, consumer, container, invokeListener);
147+
}
148+
149+
}
150+

0 commit comments

Comments
 (0)