Skip to content

Commit 17f8f85

Browse files
committed
GH-2262: Polishing
Resolves #2262
1 parent 7c52853 commit 17f8f85

10 files changed

+211
-96
lines changed

Diff for: spring-kafka/src/main/java/org/springframework/kafka/listener/BackOffHandler.java

+6
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,12 @@
2727
@FunctionalInterface
2828
public interface BackOffHandler {
2929

30+
/**
31+
* Perform the next back off.
32+
* @param container the container.
33+
* @param exception the exception.
34+
* @param nextBackOff the next back off.
35+
*/
3036
void onNextBackOff(@Nullable MessageListenerContainer container, Exception exception, long nextBackOff);
3137

3238
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
/*
2+
* Copyright 2022 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.kafka.listener;
18+
19+
import java.time.Duration;
20+
21+
import org.springframework.lang.Nullable;
22+
23+
/**
24+
* A {@link BackOffHandler} that pauses the container for the backoff.
25+
*
26+
* @author Gary Russell
27+
* @since 2.9
28+
*
29+
*/
30+
public class ContainerPausingBackOffHandler implements BackOffHandler {
31+
32+
private final ListenerContainerPauseService pauser;
33+
34+
/**
35+
* Create an instance with the provided {@link ListenerContainerPauseService}.
36+
* @param pauser the pause service.
37+
*/
38+
public ContainerPausingBackOffHandler(ListenerContainerPauseService pauser) {
39+
this.pauser = pauser;
40+
}
41+
42+
@Override
43+
public void onNextBackOff(@Nullable MessageListenerContainer container, Exception exception, long nextBackOff) {
44+
this.pauser.pause(container, Duration.ofMillis(nextBackOff));
45+
}
46+
47+
}

Diff for: spring-kafka/src/main/java/org/springframework/kafka/listener/DefaultAfterRollbackProcessor.java

+17-11
Original file line numberDiff line numberDiff line change
@@ -107,29 +107,35 @@ public DefaultAfterRollbackProcessor(@Nullable BiConsumer<ConsumerRecord<?, ?>,
107107
/**
108108
* Construct an instance with the provided recoverer which will be called after the
109109
* backOff returns STOP for a topic/partition/offset.
110-
* @param recoverer the recoverer; if null, the default (logging) recoverer is used.
111-
* @param backOff the {@link BackOff}.
110+
* @param recoverer the recoverer; if null, the default (logging) recoverer is used.
111+
* @param backOff the {@link BackOff}.
112112
* @param kafkaOperations for sending the recovered offset to the transaction.
113113
* @param commitRecovered true to commit the recovered record's offset; requires a
114-
* {@link KafkaOperations}.
115-
* @since 2.9
114+
* {@link KafkaOperations}.
115+
* @since 2.5.3
116116
*/
117-
public DefaultAfterRollbackProcessor(@Nullable BiConsumer<ConsumerRecord<?, ?>, Exception> recoverer, BackOff backOff, @Nullable KafkaOperations<?, ?> kafkaOperations, boolean commitRecovered) {
117+
public DefaultAfterRollbackProcessor(@Nullable
118+
BiConsumer<ConsumerRecord<?, ?>, Exception> recoverer,
119+
BackOff backOff, @Nullable KafkaOperations<?, ?> kafkaOperations, boolean commitRecovered) {
120+
118121
this(recoverer, backOff, null, kafkaOperations, commitRecovered);
119122
}
120123

121124
/**
122125
* Construct an instance with the provided recoverer which will be called after the
123126
* backOff returns STOP for a topic/partition/offset.
124-
* @param recoverer the recoverer; if null, the default (logging) recoverer is used.
125-
* @param backOff the {@link BackOff}.
126-
* @param backOffHandler the {@link BackOffHandler}.
127+
* @param recoverer the recoverer; if null, the default (logging) recoverer is used.
128+
* @param backOff the {@link BackOff}.
129+
* @param backOffHandler the {@link BackOffHandler}.
127130
* @param kafkaOperations for sending the recovered offset to the transaction.
128131
* @param commitRecovered true to commit the recovered record's offset; requires a
129-
* {@link KafkaOperations}.
130-
* @since 2.5.9
132+
* {@link KafkaOperations}.
133+
* @since 2.9
131134
*/
132-
public DefaultAfterRollbackProcessor(@Nullable BiConsumer<ConsumerRecord<?, ?>, Exception> recoverer, BackOff backOff, @Nullable BackOffHandler backOffHandler, @Nullable KafkaOperations<?, ?> kafkaOperations, boolean commitRecovered) {
135+
public DefaultAfterRollbackProcessor(@Nullable BiConsumer<ConsumerRecord<?, ?>, Exception> recoverer,
136+
BackOff backOff, @Nullable BackOffHandler backOffHandler, @Nullable KafkaOperations<?, ?> kafkaOperations,
137+
boolean commitRecovered) {
138+
133139
super(recoverer, backOff, backOffHandler);
134140
this.kafkaTemplate = kafkaOperations;
135141
super.setCommitRecovered(commitRecovered);

Diff for: spring-kafka/src/main/java/org/springframework/kafka/listener/DefaultErrorHandler.java

+8-3
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import org.apache.kafka.clients.consumer.ConsumerRecords;
2424
import org.apache.kafka.common.errors.SerializationException;
2525

26+
import org.springframework.kafka.support.KafkaUtils;
2627
import org.springframework.lang.Nullable;
2728
import org.springframework.util.backoff.BackOff;
2829

@@ -90,13 +91,16 @@ public DefaultErrorHandler(@Nullable ConsumerRecordRecoverer recoverer, BackOff
9091
}
9192

9293
/**
93-
* Construct an instance with the provided recoverer which will be called after
94-
* the backOff returns STOP for a topic/partition/offset.
94+
* Construct an instance with the provided recoverer which will be called after the
95+
* backOff returns STOP for a topic/partition/offset.
9596
* @param recoverer the recoverer; if null, the default (logging) recoverer is used.
9697
* @param backOff the {@link BackOff}.
9798
* @param backOffHandler the {@link BackOffHandler}.
99+
* @since 2.9
98100
*/
99-
public DefaultErrorHandler(@Nullable ConsumerRecordRecoverer recoverer, BackOff backOff, @Nullable BackOffHandler backOffHandler) {
101+
public DefaultErrorHandler(@Nullable ConsumerRecordRecoverer recoverer, BackOff backOff,
102+
@Nullable BackOffHandler backOffHandler) {
103+
100104
super(recoverer, backOff, backOffHandler, createFallback(backOff, recoverer));
101105
}
102106

@@ -151,6 +155,7 @@ public boolean handleOne(Exception thrownException, ConsumerRecord<?, ?> record,
151155
return getFailureTracker().recovered(record, thrownException, container, consumer);
152156
}
153157
catch (Exception ex) {
158+
logger.error(ex, "Failed to handle " + KafkaUtils.format(record) + " with " + thrownException);
154159
return false;
155160
}
156161
}

Diff for: spring-kafka/src/main/java/org/springframework/kafka/listener/FailedBatchProcessor.java

+1
Original file line numberDiff line numberDiff line change
@@ -75,6 +75,7 @@ public FailedBatchProcessor(@Nullable BiConsumer<ConsumerRecord<?, ?>, Exception
7575
* @param backOff the back off.
7676
* @param backOffHandler the {@link BackOffHandler}
7777
* @param fallbackHandler the fall back handler.
78+
* @since 2.9
7879
*/
7980
public FailedBatchProcessor(@Nullable BiConsumer<ConsumerRecord<?, ?>, Exception> recoverer, BackOff backOff,
8081
@Nullable BackOffHandler backOffHandler, CommonErrorHandler fallbackHandler) {

Diff for: spring-kafka/src/main/java/org/springframework/kafka/listener/FailedRecordProcessor.java

+3-1
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,9 @@ protected FailedRecordProcessor(@Nullable BiConsumer<ConsumerRecord<?, ?>, Excep
6464
this(recoverer, backOff, null);
6565
}
6666

67-
protected FailedRecordProcessor(@Nullable BiConsumer<ConsumerRecord<?, ?>, Exception> recoverer, BackOff backOff, @Nullable BackOffHandler backOffHandler) {
67+
protected FailedRecordProcessor(@Nullable BiConsumer<ConsumerRecord<?, ?>, Exception> recoverer,
68+
BackOff backOff, @Nullable BackOffHandler backOffHandler) {
69+
6870
this.failureTracker = new FailedRecordTracker(recoverer, backOff, backOffHandler, this.logger);
6971
this.failureTracker.setBackOffFunction(this.noRetriesForClassified);
7072
}

Diff for: spring-kafka/src/main/java/org/springframework/kafka/listener/FailedRecordTracker.java

+10-8
Original file line numberDiff line numberDiff line change
@@ -65,7 +65,13 @@ class FailedRecordTracker implements RecoveryStrategy {
6565
private boolean resetStateOnExceptionChange = true;
6666

6767
FailedRecordTracker(@Nullable BiConsumer<ConsumerRecord<?, ?>, Exception> recoverer, BackOff backOff,
68-
@Nullable BackOffHandler backOffHandler, LogAccessor logger) {
68+
LogAccessor logger) {
69+
70+
this(recoverer, backOff, null, logger);
71+
}
72+
73+
FailedRecordTracker(@Nullable BiConsumer<ConsumerRecord<?, ?>, Exception> recoverer, BackOff backOff,
74+
@Nullable BackOffHandler backOffHandler, LogAccessor logger) {
6975

7076
Assert.notNull(backOff, "'backOff' cannot be null");
7177
if (recoverer == null) {
@@ -77,8 +83,8 @@ class FailedRecordTracker implements RecoveryStrategy {
7783
}
7884
logger.error(thr, "Backoff "
7985
+ (failedRecord == null
80-
? "none"
81-
: failedRecord.getBackOffExecution())
86+
? "none"
87+
: failedRecord.getBackOffExecution())
8288
+ " exhausted for " + KafkaUtils.format(rec));
8389
};
8490
}
@@ -97,11 +103,6 @@ class FailedRecordTracker implements RecoveryStrategy {
97103

98104
}
99105

100-
FailedRecordTracker(@Nullable BiConsumer<ConsumerRecord<?, ?>, Exception> recoverer, BackOff backOff,
101-
LogAccessor logger) {
102-
this(recoverer, backOff, null, logger);
103-
}
104-
105106
/**
106107
* Set a function to dynamically determine the {@link BackOff} to use, based on the
107108
* consumer record and/or exception. If null is returned, the default BackOff will be
@@ -307,6 +308,7 @@ void setLastException(Exception lastException) {
307308
}
308309

309310
static class DefaultBackOffHandler implements BackOffHandler {
311+
@Override
310312
public void onNextBackOff(@Nullable MessageListenerContainer container, Exception exception, long nextBackOff) {
311313
try {
312314
if (container == null) {

Diff for: spring-kafka/src/main/java/org/springframework/kafka/listener/ListenerContainerPauseService.java

+30-42
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2014-2022 the original author or authors.
2+
* Copyright 2022 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -17,105 +17,93 @@
1717
package org.springframework.kafka.listener;
1818

1919
import java.time.Duration;
20-
import java.time.LocalDateTime;
20+
import java.time.Instant;
21+
import java.time.ZoneId;
2122
import java.util.Optional;
22-
import java.util.concurrent.Executors;
23-
import java.util.concurrent.ScheduledExecutorService;
24-
import java.util.concurrent.TimeUnit;
2523

2624
import org.apache.commons.logging.LogFactory;
2725

2826
import org.springframework.core.log.LogAccessor;
2927
import org.springframework.lang.NonNull;
3028
import org.springframework.lang.Nullable;
29+
import org.springframework.scheduling.TaskScheduler;
30+
import org.springframework.util.Assert;
3131

3232
/**
3333
* Service for pausing and resuming of {@link MessageListenerContainer}.
3434
*
3535
* @author Jan Marincek
36+
* @author Gary Russell
3637
* @since 2.9
3738
*/
3839
public class ListenerContainerPauseService {
3940

4041
private static final LogAccessor LOGGER = new LogAccessor(LogFactory.getLog(ListenerContainerPauseService.class));
42+
43+
@Nullable
4144
private final ListenerContainerRegistry registry;
42-
private final ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor();
4345

44-
public ListenerContainerPauseService(ListenerContainerRegistry registry) {
45-
this.registry = registry;
46-
}
46+
private final TaskScheduler scheduler;
4747

4848
/**
49-
* Pause the listener by given id.
50-
* Checks if the listener has already been requested to pause.
51-
*
52-
* @param listenerId the id of the listener
49+
* Create an instance with the provided registry and scheduler.
50+
* @param registry the registry or null.
51+
* @param scheduler the scheduler.
5352
*/
54-
public void pause(String listenerId) {
55-
getListenerContainer(listenerId).ifPresent(this::pause);
53+
public ListenerContainerPauseService(@Nullable ListenerContainerRegistry registry, TaskScheduler scheduler) {
54+
Assert.notNull(scheduler, "'scheduler' cannot be null");
55+
this.registry = registry;
56+
this.scheduler = scheduler;
5657
}
5758

5859
/**
5960
* Pause the listener by given id.
6061
* Checks if the listener has already been requested to pause.
6162
* Sets executor schedule for resuming the same listener after pauseDuration.
62-
*
6363
* @param listenerId the id of the listener
6464
* @param pauseDuration duration between pause() and resume() actions
6565
*/
6666
public void pause(String listenerId, Duration pauseDuration) {
67+
Assert.notNull(this.registry, "Pause by id is only supported when a registry is provided");
6768
getListenerContainer(listenerId)
6869
.ifPresent(messageListenerContainer -> pause(messageListenerContainer, pauseDuration));
6970
}
7071

7172
/**
72-
* Pause the listener by given container instance.
73-
* Checks if the listener has already been requested to pause.
74-
*
73+
* Pause the listener by given container instance. Checks if the listener has already
74+
* been requested to pause. Sets executor schedule for resuming the same listener
75+
* after pauseDuration.
7576
* @param messageListenerContainer the listener container
77+
* @param pauseDuration duration between pause() and resume() actions
7678
*/
77-
public void pause(@NonNull MessageListenerContainer messageListenerContainer) {
78-
pause(messageListenerContainer, null);
79-
}
80-
81-
/**
82-
* Pause the listener by given container instance.
83-
* Checks if the listener has already been requested to pause.
84-
* Sets executor schedule for resuming the same listener after pauseDuration.
85-
*
86-
* @param messageListenerContainer the listener container
87-
* @param pauseDuration duration between pause() and resume() actions
88-
*/
89-
public void pause(@NonNull MessageListenerContainer messageListenerContainer, @Nullable Duration pauseDuration) {
79+
public void pause(MessageListenerContainer messageListenerContainer, Duration pauseDuration) {
9080
if (messageListenerContainer.isPauseRequested()) {
9181
LOGGER.debug(() -> "Container " + messageListenerContainer + " already has pause requested");
9282
}
9383
else {
94-
LOGGER.debug(() -> "Pausing container " + messageListenerContainer);
84+
Instant resumeAt = Instant.now().plusMillis(pauseDuration.toMillis());
85+
LOGGER.debug(() -> "Pausing container " + messageListenerContainer + "resume scheduled for "
86+
+ resumeAt.atZone(ZoneId.systemDefault()).toLocalDateTime());
9587
messageListenerContainer.pause();
96-
if (messageListenerContainer.getListenerId() != null && pauseDuration != null) {
97-
LOGGER.debug(() -> "Resuming of container " + messageListenerContainer + " scheduled for " + LocalDateTime.now().plus(pauseDuration));
98-
this.executor.schedule(() -> resume(messageListenerContainer.getListenerId()), pauseDuration.toMillis(), TimeUnit.MILLISECONDS);
99-
}
88+
this.scheduler.schedule(() -> resume(messageListenerContainer), resumeAt);
10089
}
10190
}
10291

10392
/**
104-
* Resume the listener by given id.
105-
*
93+
* Resume the listener container by given id.
10694
* @param listenerId the id of the listener
10795
*/
10896
public void resume(@NonNull String listenerId) {
97+
Assert.notNull(this.registry, "Resume by id is only supported when a registry is provided");
10998
getListenerContainer(listenerId).ifPresent(this::resume);
11099
}
111100

112101
/**
113-
* Resume the listener.
114-
*
102+
* Resume the listener container.
115103
* @param messageListenerContainer the listener container
116104
*/
117105
public void resume(@NonNull MessageListenerContainer messageListenerContainer) {
118-
if (messageListenerContainer.isContainerPaused()) {
106+
if (messageListenerContainer.isPauseRequested()) {
119107
LOGGER.debug(() -> "Resuming container " + messageListenerContainer);
120108
messageListenerContainer.resume();
121109
}

0 commit comments

Comments
 (0)