Skip to content

Commit be689a3

Browse files
garyrussellartembilan
authored andcommitted
GH-609: Pause/resume polishing; events
Resolves #609 - Provide visibility to the container pause state - Add pause/resume events * Polishing - PR Comments * Checkstyle
1 parent 522f6b4 commit be689a3

File tree

9 files changed

+236
-10
lines changed

9 files changed

+236
-10
lines changed
Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,54 @@
1+
/*
2+
* Copyright 2018 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.kafka.event;
18+
19+
import java.util.Collection;
20+
21+
import org.apache.kafka.common.TopicPartition;
22+
23+
/**
24+
* An event published when a consumer is paused.
25+
*
26+
* @author Gary Russell
27+
* @since 2.1.5
28+
*
29+
*/
30+
@SuppressWarnings("serial")
31+
public class ConsumerPausedEvent extends KafkaEvent {
32+
33+
private final Collection<TopicPartition> partitions;
34+
35+
/**
36+
* Construct an instance with the provided source and partitions.
37+
* @param source the container.
38+
* @param partitions the partitions.
39+
*/
40+
public ConsumerPausedEvent(Object source, Collection<TopicPartition> partitions) {
41+
super(source);
42+
this.partitions = partitions;
43+
}
44+
45+
public Collection<TopicPartition> getPartitions() {
46+
return this.partitions;
47+
}
48+
49+
@Override
50+
public String toString() {
51+
return "ConsumerPausedEvent [partitions=" + this.partitions + "]";
52+
}
53+
54+
}
Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,54 @@
1+
/*
2+
* Copyright 2018 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.kafka.event;
18+
19+
import java.util.Collection;
20+
21+
import org.apache.kafka.common.TopicPartition;
22+
23+
/**
24+
* An event published when a consumer is resumed.
25+
*
26+
* @author Gary Russell
27+
* @since 2.1.5
28+
*
29+
*/
30+
@SuppressWarnings("serial")
31+
public class ConsumerResumedEvent extends KafkaEvent {
32+
33+
private final Collection<TopicPartition> partitions;
34+
35+
/**
36+
* Construct an instance with the provided source and partitions.
37+
* @param source the container.
38+
* @param partitions the partitions.
39+
*/
40+
public ConsumerResumedEvent(Object source, Collection<TopicPartition> partitions) {
41+
super(source);
42+
this.partitions = partitions;
43+
}
44+
45+
public Collection<TopicPartition> getPartitions() {
46+
return this.partitions;
47+
}
48+
49+
@Override
50+
public String toString() {
51+
return "ConsumerResumedEvent [partitions=" + this.partitions + "]";
52+
}
53+
54+
}

spring-kafka/src/main/java/org/springframework/kafka/event/ListenerContainerIdleEvent.java

Lines changed: 40 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2016-2017 the original author or authors.
2+
* Copyright 2016-2018 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -40,15 +40,44 @@ public class ListenerContainerIdleEvent extends KafkaEvent {
4040

4141
private final List<TopicPartition> topicPartitions;
4242

43+
private final boolean paused;
44+
4345
private transient Consumer<?, ?> consumer;
4446

47+
/**
48+
* Construct an instance with the provided arguments.
49+
* @param source the container.
50+
* @param idleTime the idle time.
51+
* @param id the container id.
52+
* @param topicPartitions the topics/partitions currently assigned.
53+
* @param consumer the consumer.
54+
* @deprecated in favor of
55+
* {@link #ListenerContainerIdleEvent(Object, long, String, Collection, Consumer, boolean)}
56+
*/
57+
@Deprecated
4558
public ListenerContainerIdleEvent(Object source, long idleTime, String id,
4659
Collection<TopicPartition> topicPartitions, Consumer<?, ?> consumer) {
60+
this(source, idleTime, id, topicPartitions, consumer, false);
61+
}
62+
63+
/**
64+
* Construct an instance with the provided arguments.
65+
* @param source the container.
66+
* @param idleTime the idle time.
67+
* @param id the container id.
68+
* @param topicPartitions the topics/partitions currently assigned.
69+
* @param consumer the consumer.
70+
* @param paused true if the consumer is paused.
71+
* @since 2.1.5
72+
*/
73+
public ListenerContainerIdleEvent(Object source, long idleTime, String id,
74+
Collection<TopicPartition> topicPartitions, Consumer<?, ?> consumer, boolean paused) {
4775
super(source);
4876
this.idleTime = idleTime;
4977
this.listenerId = id;
5078
this.topicPartitions = topicPartitions == null ? null : new ArrayList<>(topicPartitions);
5179
this.consumer = consumer;
80+
this.paused = paused;
5281
}
5382

5483
/**
@@ -85,11 +114,21 @@ public String getListenerId() {
85114
return this.consumer;
86115
}
87116

117+
/**
118+
* Return true if the consumer was paused at the time the idle event was published.
119+
* @return paused.
120+
* @since 2.1.5
121+
*/
122+
public boolean isPaused() {
123+
return this.paused;
124+
}
125+
88126
@Override
89127
public String toString() {
90128
return "ListenerContainerIdleEvent [idleTime="
91129
+ ((float) this.idleTime / 1000) + "s, listenerId=" + this.listenerId
92130
+ ", container=" + getSource()
131+
+ ", paused=" + this.paused
93132
+ ", topicPartitions=" + this.topicPartitions + "]";
94133
}
95134

spring-kafka/src/main/java/org/springframework/kafka/listener/AbstractMessageListenerContainer.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -193,6 +193,11 @@ protected boolean isPaused() {
193193
return this.paused;
194194
}
195195

196+
@Override
197+
public boolean isPauseRequested() {
198+
return this.paused;
199+
}
200+
196201
public void setPhase(int phase) {
197202
this.phase = phase;
198203
}

spring-kafka/src/main/java/org/springframework/kafka/listener/ConcurrentMessageListenerContainer.java

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -109,6 +109,19 @@ public Collection<TopicPartition> getAssignedPartitions() {
109109
return assigned;
110110
}
111111

112+
@Override
113+
public boolean isContainerPaused() {
114+
boolean paused = isPaused();
115+
if (paused) {
116+
for (AbstractMessageListenerContainer<K, V> container : this.containers) {
117+
if (!container.isContainerPaused()) {
118+
return false;
119+
}
120+
}
121+
}
122+
return paused;
123+
}
124+
112125
@Override
113126
public Map<String, Map<MetricName, ? extends Metric>> metrics() {
114127
Map<String, Map<MetricName, ? extends Metric>> metrics = new HashMap<>();

spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java

Lines changed: 29 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,8 @@
5555
import org.springframework.kafka.core.ConsumerFactory;
5656
import org.springframework.kafka.core.KafkaResourceHolder;
5757
import org.springframework.kafka.core.ProducerFactoryUtils;
58+
import org.springframework.kafka.event.ConsumerPausedEvent;
59+
import org.springframework.kafka.event.ConsumerResumedEvent;
5860
import org.springframework.kafka.event.ListenerContainerIdleEvent;
5961
import org.springframework.kafka.event.NonResponsiveConsumerEvent;
6062
import org.springframework.kafka.listener.ConsumerSeekAware.ConsumerSeekCallback;
@@ -204,6 +206,11 @@ else if (listenerConsumer.assignedPartitions != null) {
204206
}
205207
}
206208

209+
@Override
210+
public boolean isContainerPaused() {
211+
return isPaused() && this.listenerConsumer.consumerPaused;
212+
}
213+
207214
@Override
208215
public Map<String, Map<MetricName, ? extends Metric>> metrics() {
209216
ListenerConsumer listenerConsumer = this.listenerConsumer;
@@ -288,21 +295,35 @@ public void onSuccess(Object result) {
288295
}
289296
}
290297

291-
private void publishIdleContainerEvent(long idleTime, Consumer<?, ?> consumer) {
298+
private void publishIdleContainerEvent(long idleTime, Consumer<?, ?> consumer, boolean paused) {
292299
if (getApplicationEventPublisher() != null) {
293300
getApplicationEventPublisher().publishEvent(new ListenerContainerIdleEvent(
294-
KafkaMessageListenerContainer.this, idleTime, getBeanName(), getAssignedPartitions(), consumer));
301+
this, idleTime, getBeanName(), getAssignedPartitions(), consumer, paused));
295302
}
296303
}
297304

298305
private void publishNonResponsiveConsumerEvent(long timeSinceLastPoll, Consumer<?, ?> consumer) {
299306
if (getApplicationEventPublisher() != null) {
300307
getApplicationEventPublisher().publishEvent(
301-
new NonResponsiveConsumerEvent(KafkaMessageListenerContainer.this, timeSinceLastPoll,
308+
new NonResponsiveConsumerEvent(this, timeSinceLastPoll,
302309
getBeanName(), getAssignedPartitions(), consumer));
303310
}
304311
}
305312

313+
private void publishConsumerPausedEvent(Collection<TopicPartition> partitions) {
314+
if (getApplicationEventPublisher() != null) {
315+
getApplicationEventPublisher().publishEvent(new ConsumerPausedEvent(this,
316+
Collections.unmodifiableCollection(partitions)));
317+
}
318+
}
319+
320+
private void publishConsumerResumedEvent(Collection<TopicPartition> partitions) {
321+
if (getApplicationEventPublisher() != null) {
322+
getApplicationEventPublisher().publishEvent(new ConsumerResumedEvent(this,
323+
Collections.unmodifiableCollection(partitions)));
324+
}
325+
}
326+
306327
@Override
307328
public String toString() {
308329
return "KafkaMessageListenerContainer [id=" + getBeanName()
@@ -671,14 +692,17 @@ public void run() {
671692
if (this.logger.isDebugEnabled()) {
672693
this.logger.debug("Paused consumption from: " + this.consumer.paused());
673694
}
695+
publishConsumerPausedEvent(this.consumer.assignment());
674696
}
675697
ConsumerRecords<K, V> records = this.consumer.poll(this.containerProperties.getPollTimeout());
676698
if (this.consumerPaused && !isPaused()) {
677699
if (this.logger.isDebugEnabled()) {
678700
this.logger.debug("Resuming consumption from: " + this.consumer.paused());
679701
}
680-
this.consumer.resume(this.consumer.paused());
702+
Set<TopicPartition> paused = this.consumer.paused();
703+
this.consumer.resume(paused);
681704
this.consumerPaused = false;
705+
publishConsumerResumedEvent(paused);
682706
}
683707
if (records != null && this.logger.isDebugEnabled()) {
684708
this.logger.debug("Received: " + records.count() + " records");
@@ -702,7 +726,7 @@ public void run() {
702726
if (now > lastReceive + this.containerProperties.getIdleEventInterval()
703727
&& now > lastAlertAt + this.containerProperties.getIdleEventInterval()) {
704728
publishIdleContainerEvent(now - lastReceive, this.isConsumerAwareListener
705-
? this.consumer : null);
729+
? this.consumer : null, this.consumerPaused);
706730
lastAlertAt = now;
707731
if (this.genericListener instanceof ConsumerSeekAware) {
708732
seekPartitions(getAssignedPartitions(), true);

spring-kafka/src/main/java/org/springframework/kafka/listener/MessageListenerContainer.java

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -85,4 +85,24 @@ default void resume() {
8585
throw new UnsupportedOperationException("This container doesn't support resume");
8686
}
8787

88+
/**
89+
* Return true if {@link #pause()} has been called; the container might not have actually
90+
* paused yet.
91+
* @return true if pause has been requested.
92+
* @since 2.1.5
93+
*/
94+
default boolean isPauseRequested() {
95+
throw new UnsupportedOperationException("This container doesn't support pause/resume");
96+
}
97+
98+
/**
99+
* Return true if {@link #pause()} has been called; and all consumers in this container
100+
* have actually paused.
101+
* @return true if the container is paused.
102+
* @since 2.1.5
103+
*/
104+
default boolean isContainerPaused() {
105+
throw new UnsupportedOperationException("This container doesn't support pause/resume");
106+
}
107+
88108
}

spring-kafka/src/test/java/org/springframework/kafka/listener/KafkaMessageListenerContainerTests.java

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,8 @@
7373
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
7474
import org.springframework.kafka.core.KafkaTemplate;
7575
import org.springframework.kafka.core.ProducerFactory;
76+
import org.springframework.kafka.event.ConsumerPausedEvent;
77+
import org.springframework.kafka.event.ConsumerResumedEvent;
7678
import org.springframework.kafka.event.NonResponsiveConsumerEvent;
7779
import org.springframework.kafka.listener.AbstractMessageListenerContainer.AckMode;
7880
import org.springframework.kafka.listener.adapter.FilteringMessageListenerAdapter;
@@ -1649,13 +1651,13 @@ public void testPauseResume() throws Exception {
16491651
return null;
16501652
}).given(consumer).commitSync(any(Map.class));
16511653
given(consumer.assignment()).willReturn(records.keySet());
1652-
final CountDownLatch pauseLatch = new CountDownLatch(1);
1654+
final CountDownLatch pauseLatch = new CountDownLatch(2);
16531655
willAnswer(i -> {
16541656
pauseLatch.countDown();
16551657
return null;
16561658
}).given(consumer).pause(records.keySet());
16571659
given(consumer.paused()).willReturn(records.keySet());
1658-
final CountDownLatch resumeLatch = new CountDownLatch(1);
1660+
final CountDownLatch resumeLatch = new CountDownLatch(2);
16591661
willAnswer(i -> {
16601662
resumeLatch.countDown();
16611663
return null;
@@ -1669,6 +1671,14 @@ public void testPauseResume() throws Exception {
16691671
containerProps.setMessageListener((MessageListener) r -> { });
16701672
KafkaMessageListenerContainer<Integer, String> container =
16711673
new KafkaMessageListenerContainer<>(cf, containerProps);
1674+
container.setApplicationEventPublisher(e -> {
1675+
if (e instanceof ConsumerPausedEvent) {
1676+
pauseLatch.countDown();
1677+
}
1678+
else if (e instanceof ConsumerResumedEvent) {
1679+
resumeLatch.countDown();
1680+
}
1681+
});
16721682
container.start();
16731683
assertThat(commitLatch.await(10, TimeUnit.SECONDS)).isTrue();
16741684
verify(consumer, times(2)).commitSync(any(Map.class));

0 commit comments

Comments
 (0)