Skip to content

Commit b048aaa

Browse files
committed
GH-550: master to 2.2; fix tangles
Fixes #550 - package tangle `....listener` and `....listener.config` - remove config package - class tangles between `ContainerProperties` and the listener containers - AckMode moved to properties - Error handler setters moved from properties to containers
1 parent e6985e8 commit b048aaa

29 files changed

+292
-242
lines changed

Diff for: gradle.properties

+1-1
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
version=2.1.6.BUILD-SNAPSHOT
1+
version=2.2.0.BUILD-SNAPSHOT

Diff for: spring-kafka/src/main/java/org/springframework/kafka/config/AbstractKafkaListenerContainerFactory.java

+24-6
Original file line numberDiff line numberDiff line change
@@ -26,9 +26,10 @@
2626
import org.springframework.kafka.core.KafkaTemplate;
2727
import org.springframework.kafka.listener.AbstractMessageListenerContainer;
2828
import org.springframework.kafka.listener.BatchErrorHandler;
29+
import org.springframework.kafka.listener.ContainerProperties;
2930
import org.springframework.kafka.listener.ErrorHandler;
31+
import org.springframework.kafka.listener.GenericErrorHandler;
3032
import org.springframework.kafka.listener.adapter.RecordFilterStrategy;
31-
import org.springframework.kafka.listener.config.ContainerProperties;
3233
import org.springframework.kafka.support.converter.MessageConverter;
3334
import org.springframework.retry.RecoveryCallback;
3435
import org.springframework.retry.support.RetryTemplate;
@@ -51,6 +52,8 @@ public abstract class AbstractKafkaListenerContainerFactory<C extends AbstractMe
5152

5253
private final ContainerProperties containerProperties = new ContainerProperties((Pattern) null);
5354

55+
private GenericErrorHandler<?> errorHandler;
56+
5457
private ConsumerFactory<K, V> consumerFactory;
5558

5659
private Boolean autoStartup;
@@ -191,6 +194,24 @@ public void setReplyTemplate(KafkaTemplate<K, V> replyTemplate) {
191194
this.replyTemplate = replyTemplate;
192195
}
193196

197+
/**
198+
* Set the error handler to call when the listener throws an exception.
199+
* @param errorHandler the error handler.
200+
* @since 2.2
201+
*/
202+
public void setErrorHandler(ErrorHandler errorHandler) {
203+
this.errorHandler = errorHandler;
204+
}
205+
206+
/**
207+
* Set the batch error handler to call when the listener throws an exception.
208+
* @param errorHandler the error handler.
209+
* @since 2.2
210+
*/
211+
public void setBatchErrorHandler(BatchErrorHandler errorHandler) {
212+
this.errorHandler = errorHandler;
213+
}
214+
194215
/**
195216
* Obtain the properties template for this factory - set properties as needed
196217
* and they will be copied to a final properties instance for the endpoint.
@@ -274,11 +295,8 @@ protected void initializeContainer(C instance) {
274295
if (this.containerProperties.getAckTime() > 0) {
275296
properties.setAckTime(this.containerProperties.getAckTime());
276297
}
277-
if (this.containerProperties.getGenericErrorHandler() instanceof BatchErrorHandler) {
278-
properties.setBatchErrorHandler((BatchErrorHandler) this.containerProperties.getGenericErrorHandler());
279-
}
280-
else {
281-
properties.setErrorHandler((ErrorHandler) this.containerProperties.getGenericErrorHandler());
298+
if (this.errorHandler != null) {
299+
instance.setGenericErrorHandler(this.errorHandler);
282300
}
283301
}
284302

Diff for: spring-kafka/src/main/java/org/springframework/kafka/config/ConcurrentKafkaListenerContainerFactory.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2014-2016 the original author or authors.
2+
* Copyright 2014-2018 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -19,7 +19,7 @@
1919
import java.util.Collection;
2020

2121
import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;
22-
import org.springframework.kafka.listener.config.ContainerProperties;
22+
import org.springframework.kafka.listener.ContainerProperties;
2323
import org.springframework.kafka.support.TopicPartitionInitialOffset;
2424

2525
/**

Diff for: spring-kafka/src/main/java/org/springframework/kafka/listener/AbstractMessageListenerContainer.java

+38-58
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,6 @@
3232
import org.springframework.context.ApplicationEventPublisherAware;
3333
import org.springframework.context.SmartLifecycle;
3434
import org.springframework.kafka.core.ConsumerFactory;
35-
import org.springframework.kafka.listener.config.ContainerProperties;
3635
import org.springframework.util.Assert;
3736
import org.springframework.util.StringUtils;
3837

@@ -56,57 +55,6 @@ public abstract class AbstractMessageListenerContainer<K, V>
5655

5756
protected final Log logger = LogFactory.getLog(this.getClass()); // NOSONAR
5857

59-
/**
60-
* The offset commit behavior enumeration.
61-
*/
62-
public enum AckMode {
63-
64-
/**
65-
* Commit after each record is processed by the listener.
66-
*/
67-
RECORD,
68-
69-
/**
70-
* Commit whatever has already been processed before the next poll.
71-
*/
72-
BATCH,
73-
74-
/**
75-
* Commit pending updates after
76-
* {@link ContainerProperties#setAckTime(long) ackTime} has elapsed.
77-
*/
78-
TIME,
79-
80-
/**
81-
* Commit pending updates after
82-
* {@link ContainerProperties#setAckCount(int) ackCount} has been
83-
* exceeded.
84-
*/
85-
COUNT,
86-
87-
/**
88-
* Commit pending updates after
89-
* {@link ContainerProperties#setAckCount(int) ackCount} has been
90-
* exceeded or after {@link ContainerProperties#setAckTime(long)
91-
* ackTime} has elapsed.
92-
*/
93-
COUNT_TIME,
94-
95-
/**
96-
* User takes responsibility for acks using an
97-
* {@link AcknowledgingMessageListener}.
98-
*/
99-
MANUAL,
100-
101-
/**
102-
* User takes responsibility for acks using an
103-
* {@link AcknowledgingMessageListener}. The consumer
104-
* immediately processes the commit.
105-
*/
106-
MANUAL_IMMEDIATE,
107-
108-
}
109-
11058
protected final ConsumerFactory<K, V> consumerFactory; // NOSONAR (final)
11159

11260
private final ContainerProperties containerProperties;
@@ -117,6 +65,8 @@ public enum AckMode {
11765

11866
private ApplicationEventPublisher applicationEventPublisher;
11967

68+
private GenericErrorHandler<?> errorHandler;
69+
12070
private boolean autoStartup = true;
12171

12272
private int phase = DEFAULT_PHASE;
@@ -168,12 +118,6 @@ else if (containerProperties.getTopicPattern() != null) {
168118
if (this.containerProperties.getConsumerRebalanceListener() == null) {
169119
this.containerProperties.setConsumerRebalanceListener(createSimpleLoggingConsumerRebalanceListener());
170120
}
171-
if (containerProperties.getGenericErrorHandler() instanceof BatchErrorHandler) {
172-
this.containerProperties.setBatchErrorHandler((BatchErrorHandler) containerProperties.getGenericErrorHandler());
173-
}
174-
else {
175-
this.containerProperties.setErrorHandler((ErrorHandler) containerProperties.getGenericErrorHandler());
176-
}
177121
}
178122

179123
@Override
@@ -194,6 +138,42 @@ public ApplicationEventPublisher getApplicationEventPublisher() {
194138
return this.applicationEventPublisher;
195139
}
196140

141+
/**
142+
* Set the error handler to call when the listener throws an exception.
143+
* @param errorHandler the error handler.
144+
* @since 2.2
145+
*/
146+
public void setErrorHandler(ErrorHandler errorHandler) {
147+
this.errorHandler = errorHandler;
148+
}
149+
150+
/**
151+
* Set the error handler to call when the listener throws an exception.
152+
* @param errorHandler the error handler.
153+
* @since 2.2
154+
*/
155+
public void setGenericErrorHandler(GenericErrorHandler<?> errorHandler) {
156+
this.errorHandler = errorHandler;
157+
}
158+
159+
/**
160+
* Set the batch error handler to call when the listener throws an exception.
161+
* @param errorHandler the error handler.
162+
* @since 2.2
163+
*/
164+
public void setBatchErrorHandler(BatchErrorHandler errorHandler) {
165+
this.errorHandler = errorHandler;
166+
}
167+
168+
/**
169+
* Get the configured error handler.
170+
* @return the error handler.
171+
* @since 2.2
172+
*/
173+
protected GenericErrorHandler<?> getGenericErrorHandler() {
174+
return this.errorHandler;
175+
}
176+
197177
@Override
198178
public boolean isAutoStartup() {
199179
return this.autoStartup;

Diff for: spring-kafka/src/main/java/org/springframework/kafka/listener/ConcurrentMessageListenerContainer.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,6 @@
3232
import org.apache.kafka.common.TopicPartition;
3333

3434
import org.springframework.kafka.core.ConsumerFactory;
35-
import org.springframework.kafka.listener.config.ContainerProperties;
3635
import org.springframework.kafka.support.TopicPartitionInitialOffset;
3736
import org.springframework.util.Assert;
3837

@@ -161,6 +160,7 @@ protected void doStart() {
161160
container.setApplicationEventPublisher(getApplicationEventPublisher());
162161
}
163162
container.setClientIdSuffix("-" + i);
163+
container.setGenericErrorHandler(getGenericErrorHandler());
164164
container.start();
165165
this.containers.add(container);
166166
}

Diff for: spring-kafka/src/main/java/org/springframework/kafka/listener/config/ContainerProperties.java renamed to spring-kafka/src/main/java/org/springframework/kafka/listener/ContainerProperties.java

+55-35
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@
1414
* limitations under the License.
1515
*/
1616

17-
package org.springframework.kafka.listener.config;
17+
package org.springframework.kafka.listener;
1818

1919
import java.util.Arrays;
2020
import java.util.LinkedHashSet;
@@ -24,11 +24,6 @@
2424
import org.apache.kafka.clients.consumer.OffsetCommitCallback;
2525

2626
import org.springframework.core.task.AsyncListenableTaskExecutor;
27-
import org.springframework.kafka.listener.AbstractMessageListenerContainer;
28-
import org.springframework.kafka.listener.AbstractMessageListenerContainer.AckMode;
29-
import org.springframework.kafka.listener.BatchErrorHandler;
30-
import org.springframework.kafka.listener.ErrorHandler;
31-
import org.springframework.kafka.listener.GenericErrorHandler;
3227
import org.springframework.kafka.support.LogIfLevelEnabled;
3328
import org.springframework.kafka.support.TopicPartitionInitialOffset;
3429
import org.springframework.scheduling.TaskScheduler;
@@ -46,6 +41,57 @@
4641
*/
4742
public class ContainerProperties {
4843

44+
/**
45+
* The offset commit behavior enumeration.
46+
*/
47+
public enum AckMode {
48+
49+
/**
50+
* Commit after each record is processed by the listener.
51+
*/
52+
RECORD,
53+
54+
/**
55+
* Commit whatever has already been processed before the next poll.
56+
*/
57+
BATCH,
58+
59+
/**
60+
* Commit pending updates after
61+
* {@link ContainerProperties#setAckTime(long) ackTime} has elapsed.
62+
*/
63+
TIME,
64+
65+
/**
66+
* Commit pending updates after
67+
* {@link ContainerProperties#setAckCount(int) ackCount} has been
68+
* exceeded.
69+
*/
70+
COUNT,
71+
72+
/**
73+
* Commit pending updates after
74+
* {@link ContainerProperties#setAckCount(int) ackCount} has been
75+
* exceeded or after {@link ContainerProperties#setAckTime(long)
76+
* ackTime} has elapsed.
77+
*/
78+
COUNT_TIME,
79+
80+
/**
81+
* User takes responsibility for acks using an
82+
* {@link AcknowledgingMessageListener}.
83+
*/
84+
MANUAL,
85+
86+
/**
87+
* User takes responsibility for acks using an
88+
* {@link AcknowledgingMessageListener}. The consumer
89+
* immediately processes the commit.
90+
*/
91+
MANUAL_IMMEDIATE,
92+
93+
}
94+
4995
private static final long DEFAULT_POLL_TIMEOUT = 1000L;
5096

5197
private static final int DEFAULT_SHUTDOWN_TIMEOUT = 10000;
@@ -82,7 +128,7 @@ public class ContainerProperties {
82128
* {@link org.springframework.kafka.listener.AcknowledgingMessageListener}.
83129
* </ul>
84130
*/
85-
private AbstractMessageListenerContainer.AckMode ackMode = AckMode.BATCH;
131+
private AckMode ackMode = AckMode.BATCH;
86132

87133
/**
88134
* The number of outstanding record count after which offsets should be
@@ -114,11 +160,6 @@ public class ContainerProperties {
114160
*/
115161
private AsyncListenableTaskExecutor consumerTaskExecutor;
116162

117-
/**
118-
* The error handler to call when the listener throws an exception.
119-
*/
120-
private GenericErrorHandler<?> errorHandler;
121-
122163
/**
123164
* The timeout for shutting down the container. This is the maximum amount of
124165
* time that the invocation to {@code #stop(Runnable)} will block for, before
@@ -209,7 +250,7 @@ public void setMessageListener(Object messageListener) {
209250
* </ul>
210251
* @param ackMode the {@link AckMode}; default BATCH.
211252
*/
212-
public void setAckMode(AbstractMessageListenerContainer.AckMode ackMode) {
253+
public void setAckMode(AckMode ackMode) {
213254
Assert.notNull(ackMode, "'ackMode' cannot be null");
214255
this.ackMode = ackMode;
215256
}
@@ -243,22 +284,6 @@ public void setAckTime(long ackTime) {
243284
this.ackTime = ackTime;
244285
}
245286

246-
/**
247-
* Set the error handler to call when the listener throws an exception.
248-
* @param errorHandler the error handler.
249-
*/
250-
public void setErrorHandler(ErrorHandler errorHandler) {
251-
this.errorHandler = errorHandler;
252-
}
253-
254-
/**
255-
* Set the batch error handler to call when the listener throws an exception.
256-
* @param errorHandler the error handler.
257-
*/
258-
public void setBatchErrorHandler(BatchErrorHandler errorHandler) {
259-
this.errorHandler = errorHandler;
260-
}
261-
262287
/**
263288
* Set the executor for threads that poll the consumer.
264289
* @param consumerTaskExecutor the executor
@@ -355,7 +380,7 @@ public TopicPartitionInitialOffset[] getTopicPartitions() {
355380
return this.topicPartitions;
356381
}
357382

358-
public AbstractMessageListenerContainer.AckMode getAckMode() {
383+
public AckMode getAckMode() {
359384
return this.ackMode;
360385
}
361386

@@ -379,10 +404,6 @@ public AsyncListenableTaskExecutor getConsumerTaskExecutor() {
379404
return this.consumerTaskExecutor;
380405
}
381406

382-
public GenericErrorHandler<?> getGenericErrorHandler() {
383-
return this.errorHandler;
384-
}
385-
386407
public long getShutdownTimeout() {
387408
return this.shutdownTimeout;
388409
}
@@ -542,7 +563,6 @@ public String toString() {
542563
+ ", pollTimeout=" + this.pollTimeout
543564
+ (this.consumerTaskExecutor != null
544565
? ", consumerTaskExecutor=" + this.consumerTaskExecutor : "")
545-
+ (this.errorHandler != null ? ", errorHandler=" + this.errorHandler : "")
546566
+ ", shutdownTimeout=" + this.shutdownTimeout
547567
+ (this.consumerRebalanceListener != null
548568
? ", consumerRebalanceListener=" + this.consumerRebalanceListener : "")

0 commit comments

Comments
 (0)