Skip to content

Commit a9ec919

Browse files
authored
GH-2355: Add ManualAckListenerErrorHandler (#2356)
* GH-2355: Add ManualAckListenerErrorHandler Resolves #2355 **cherry-pick to 2.9.x** * Fix typo.
1 parent 4506809 commit a9ec919

File tree

7 files changed

+169
-46
lines changed

7 files changed

+169
-46
lines changed

spring-kafka-docs/src/main/asciidoc/kafka.adoc

+4-43
Original file line numberDiff line numberDiff line change
@@ -5049,56 +5049,17 @@ Object handleError(Message<?> message, ListenerExecutionFailedException exceptio
50495049
----
50505050
====
50515051

5052-
If your error handler implements this interface, you can, for example, adjust the offsets accordingly.
5053-
For example, to reset the offset to replay the failed message, you could do something like the following:
5052+
Another sub-interface (`ManualAckListenerErrorHandler`) provides access to the `Acknowledgment` object when using manual `AckMode` s.
50545053

50555054
====
50565055
[source, java]
50575056
----
5058-
@Bean
5059-
public ConsumerAwareListenerErrorHandler listen3ErrorHandler() {
5060-
return (m, e, c) -> {
5061-
this.listen3Exception = e;
5062-
MessageHeaders headers = m.getHeaders();
5063-
c.seek(new org.apache.kafka.common.TopicPartition(
5064-
headers.get(KafkaHeaders.RECEIVED_TOPIC, String.class),
5065-
headers.get(KafkaHeaders.RECEIVED_PARTITION, Integer.class)),
5066-
headers.get(KafkaHeaders.OFFSET, Long.class));
5067-
return null;
5068-
};
5069-
}
5057+
Object handleError(Message<?> message, ListenerExecutionFailedException exception,
5058+
Consumer<?, ?> consumer, @Nullable Acknowledgment ack);
50705059
----
50715060
====
50725061

5073-
Similarly, you could do something like the following for a batch listener:
5074-
5075-
====
5076-
[source, java]
5077-
----
5078-
@Bean
5079-
public ConsumerAwareListenerErrorHandler listen10ErrorHandler() {
5080-
return (m, e, c) -> {
5081-
this.listen10Exception = e;
5082-
MessageHeaders headers = m.getHeaders();
5083-
List<String> topics = headers.get(KafkaHeaders.RECEIVED_TOPIC, List.class);
5084-
List<Integer> partitions = headers.get(KafkaHeaders.RECEIVED_PARTITION, List.class);
5085-
List<Long> offsets = headers.get(KafkaHeaders.OFFSET, List.class);
5086-
Map<TopicPartition, Long> offsetsToReset = new HashMap<>();
5087-
for (int i = 0; i < topics.size(); i++) {
5088-
int index = i;
5089-
offsetsToReset.compute(new TopicPartition(topics.get(i), partitions.get(i)),
5090-
(k, v) -> v == null ? offsets.get(index) : Math.min(v, offsets.get(index)));
5091-
}
5092-
offsetsToReset.forEach((k, v) -> c.seek(k, v));
5093-
return null;
5094-
};
5095-
}
5096-
----
5097-
====
5098-
5099-
This resets each topic/partition in the batch to the lowest offset in the batch.
5100-
5101-
NOTE: The preceding two examples are simplistic implementations, and you would probably want more checking in the error handler.
5062+
In either case, you should NOT perform any seeks on the consumer because the container would be unaware of them.
51025063

51035064
[[error-handlers]]
51045065
===== Container Error Handlers

spring-kafka/src/main/java/org/springframework/kafka/listener/ConsumerAwareListenerErrorHandler.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ public interface ConsumerAwareListenerErrorHandler extends KafkaListenerErrorHan
3434

3535
@Override
3636
default Object handleError(Message<?> message, ListenerExecutionFailedException exception) {
37-
throw new UnsupportedOperationException("Container should never call this");
37+
throw new UnsupportedOperationException("Adapter should never call this");
3838
}
3939

4040
@Override

spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaListenerErrorHandler.java

+18
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,8 @@
1818

1919
import org.apache.kafka.clients.consumer.Consumer;
2020

21+
import org.springframework.kafka.support.Acknowledgment;
22+
import org.springframework.lang.Nullable;
2123
import org.springframework.messaging.Message;
2224

2325
/**
@@ -60,4 +62,20 @@ default Object handleError(Message<?> message, ListenerExecutionFailedException
6062
return handleError(message, exception);
6163
}
6264

65+
/**
66+
* Handle the error.
67+
* @param message the spring-messaging message.
68+
* @param exception the exception the listener threw, wrapped in a
69+
* {@link ListenerExecutionFailedException}.
70+
* @param consumer the consumer.
71+
* @param ack the {@link Acknowledgment}.
72+
* @return the return value is ignored unless the annotated method has a
73+
* {@code @SendTo} annotation.
74+
*/
75+
default Object handleError(Message<?> message, ListenerExecutionFailedException exception,
76+
Consumer<?, ?> consumer, @Nullable Acknowledgment ack) {
77+
78+
return handleError(message, exception, consumer);
79+
}
80+
6381
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,44 @@
1+
/*
2+
* Copyright 2022 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.kafka.listener;
18+
19+
import org.apache.kafka.clients.consumer.Consumer;
20+
21+
import org.springframework.kafka.support.Acknowledgment;
22+
import org.springframework.lang.Nullable;
23+
import org.springframework.messaging.Message;
24+
25+
/**
26+
* A {@link KafkaListenerErrorHandler} that supports manual acks.
27+
*
28+
* @author Gary Russell
29+
* @since 2.9
30+
*
31+
*/
32+
@FunctionalInterface
33+
public interface ManualAckListenerErrorHandler extends KafkaListenerErrorHandler {
34+
35+
@Override
36+
default Object handleError(Message<?> message, ListenerExecutionFailedException exception) {
37+
throw new UnsupportedOperationException("Adapter should never call this");
38+
}
39+
40+
@Override
41+
Object handleError(Message<?> message, ListenerExecutionFailedException exception, Consumer<?, ?> consumer,
42+
@Nullable Acknowledgment ack);
43+
44+
}

spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/BatchMessagingMessageListenerAdapter.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -188,7 +188,7 @@ protected void invoke(Object records, @Nullable Acknowledgment acknowledgment, C
188188
if (message.equals(NULL_MESSAGE)) {
189189
message = new GenericMessage<>(records);
190190
}
191-
Object result = this.errorHandler.handleError(message, e, consumer);
191+
Object result = this.errorHandler.handleError(message, e, consumer, acknowledgment);
192192
if (result != null) {
193193
handleResult(result, records, message);
194194
}

spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/RecordMessagingMessageListenerAdapter.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -100,7 +100,7 @@ public void onMessage(ConsumerRecord<K, V> record, @Nullable Acknowledgment ackn
100100
if (message.equals(NULL_MESSAGE)) {
101101
message = new GenericMessage<>(record);
102102
}
103-
Object result = this.errorHandler.handleError(message, e, consumer);
103+
Object result = this.errorHandler.handleError(message, e, consumer, acknowledgment);
104104
if (result != null) {
105105
handleResult(result, record, message);
106106
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,100 @@
1+
/*
2+
* Copyright 2022 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.kafka.listener;
18+
19+
import static org.mockito.ArgumentMatchers.any;
20+
import static org.mockito.BDDMockito.willThrow;
21+
import static org.mockito.Mockito.mock;
22+
import static org.mockito.Mockito.verify;
23+
24+
import java.lang.reflect.Method;
25+
import java.util.Collections;
26+
import java.util.List;
27+
28+
import org.apache.kafka.clients.consumer.Consumer;
29+
import org.apache.kafka.clients.consumer.ConsumerRecord;
30+
import org.junit.jupiter.api.Test;
31+
32+
import org.springframework.kafka.listener.adapter.BatchMessagingMessageListenerAdapter;
33+
import org.springframework.kafka.listener.adapter.HandlerAdapter;
34+
import org.springframework.kafka.listener.adapter.RecordMessagingMessageListenerAdapter;
35+
import org.springframework.kafka.support.Acknowledgment;
36+
37+
/**
38+
* @author Gary Russell
39+
* @since 2.9
40+
*
41+
*/
42+
public class ListenerErrorHandlerTests {
43+
44+
private static Method test1;
45+
46+
private static Method test2;
47+
48+
static {
49+
try {
50+
test1 = TestListener.class.getDeclaredMethod("test1", String.class, Acknowledgment.class);
51+
test2 = TestListener.class.getDeclaredMethod("test2", List.class, Acknowledgment.class);
52+
}
53+
catch (NoSuchMethodException | SecurityException e) {
54+
throw new IllegalStateException(e);
55+
}
56+
}
57+
58+
@SuppressWarnings({ "rawtypes", "unchecked" })
59+
@Test
60+
void record() throws Exception {
61+
RecordMessagingMessageListenerAdapter adapter = new RecordMessagingMessageListenerAdapter(getClass(), test1,
62+
(ManualAckListenerErrorHandler) (msg, ex, cons, ack) -> {
63+
ack.acknowledge();
64+
return null;
65+
});
66+
HandlerAdapter handler = mock(HandlerAdapter.class);
67+
willThrow(new RuntimeException("test")).given(handler).invoke(any(), any());
68+
adapter.setHandlerMethod(handler);
69+
Acknowledgment ack = mock(Acknowledgment.class);
70+
adapter.onMessage(mock(ConsumerRecord.class), ack, mock(Consumer.class));
71+
verify(ack).acknowledge();
72+
}
73+
74+
@SuppressWarnings({ "rawtypes", "unchecked" })
75+
@Test
76+
void batch() throws Exception {
77+
BatchMessagingMessageListenerAdapter adapter = new BatchMessagingMessageListenerAdapter(getClass(), test2,
78+
(ManualAckListenerErrorHandler) (msg, ex, cons, ack) -> {
79+
ack.acknowledge();
80+
return null;
81+
});
82+
HandlerAdapter handler = mock(HandlerAdapter.class);
83+
willThrow(new RuntimeException("test")).given(handler).invoke(any(), any());
84+
adapter.setHandlerMethod(handler);
85+
Acknowledgment ack = mock(Acknowledgment.class);
86+
adapter.onMessage(Collections.emptyList(), ack, mock(Consumer.class));
87+
verify(ack).acknowledge();
88+
}
89+
90+
private static class TestListener {
91+
92+
void test1(String foo, Acknowledgment ack) {
93+
}
94+
95+
void test2(List<String> foo, Acknowledgment ack) {
96+
}
97+
98+
}
99+
100+
}

0 commit comments

Comments
 (0)