Skip to content

Commit b2a1d4f

Browse files
committed
GH-631: Fail fast with missing required group.id
Resolves spring-projects/spring-kafka#631
1 parent 9c03049 commit b2a1d4f

8 files changed

+202
-21
lines changed

spring-kafka/src/main/java/org/springframework/kafka/listener/AbstractMessageListenerContainer.java

Lines changed: 40 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222

2323
import org.apache.commons.logging.Log;
2424
import org.apache.commons.logging.LogFactory;
25+
import org.apache.kafka.clients.consumer.ConsumerConfig;
2526
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
2627
import org.apache.kafka.common.TopicPartition;
2728

@@ -30,8 +31,10 @@
3031
import org.springframework.context.ApplicationEventPublisher;
3132
import org.springframework.context.ApplicationEventPublisherAware;
3233
import org.springframework.context.SmartLifecycle;
34+
import org.springframework.kafka.core.ConsumerFactory;
3335
import org.springframework.kafka.listener.config.ContainerProperties;
3436
import org.springframework.util.Assert;
37+
import org.springframework.util.StringUtils;
3538

3639
/**
3740
* The base implementation for the {@link MessageListenerContainer}.
@@ -104,6 +107,8 @@ public enum AckMode {
104107

105108
}
106109

110+
protected final ConsumerFactory<K, V> consumerFactory; // NOSONAR (final)
111+
107112
private final ContainerProperties containerProperties;
108113

109114
private final Object lifecycleMonitor = new Object();
@@ -120,9 +125,27 @@ public enum AckMode {
120125

121126
private volatile boolean paused;
122127

128+
/**
129+
* Construct an instance with the provided properties.
130+
* @param containerProperties the properties.
131+
* @deprecated in favor of
132+
* {@link #AbstractMessageListenerContainer(ConsumerFactory, ContainerProperties)}.
133+
*/
134+
@Deprecated
123135
protected AbstractMessageListenerContainer(ContainerProperties containerProperties) {
124-
Assert.notNull(containerProperties, "'containerProperties' cannot be null");
136+
this(null, containerProperties);
137+
}
125138

139+
/**
140+
* Construct an instance with the provided factory and properties.
141+
* @param consumerFactory the factory.
142+
* @param containerProperties the properties.
143+
*/
144+
protected AbstractMessageListenerContainer(ConsumerFactory<K, V> consumerFactory,
145+
ContainerProperties containerProperties) {
146+
147+
Assert.notNull(containerProperties, "'containerProperties' cannot be null");
148+
this.consumerFactory = consumerFactory;
126149
if (containerProperties.getTopics() != null) {
127150
this.containerProperties = new ContainerProperties(containerProperties.getTopics());
128151
}
@@ -219,6 +242,7 @@ public void setupMessageListener(Object messageListener) {
219242

220243
@Override
221244
public final void start() {
245+
checkGroupId();
222246
synchronized (this.lifecycleMonitor) {
223247
if (!isRunning()) {
224248
Assert.isTrue(
@@ -229,6 +253,21 @@ public final void start() {
229253
}
230254
}
231255

256+
public void checkGroupId() {
257+
if (this.containerProperties.getTopicPartitions() == null) {
258+
boolean hasGroupIdConsumerConfig = true; // assume true for non-standard containers
259+
if (this.consumerFactory != null) { // we always have one for standard containers
260+
Object groupIdConfig = this.consumerFactory.getConfigurationProperties()
261+
.get(ConsumerConfig.GROUP_ID_CONFIG);
262+
hasGroupIdConsumerConfig = groupIdConfig != null && groupIdConfig instanceof String
263+
&& StringUtils.hasText((String) groupIdConfig);
264+
}
265+
Assert.state(hasGroupIdConsumerConfig || StringUtils.hasText(this.containerProperties.getGroupId()),
266+
"No group.id found in consumer config, container properties, or @KafkaListener annotation; "
267+
+ "a group.id is required when group management is used.");
268+
}
269+
}
270+
232271
protected abstract void doStart();
233272

234273
@Override

spring-kafka/src/main/java/org/springframework/kafka/listener/ConcurrentMessageListenerContainer.java

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -55,8 +55,6 @@
5555
*/
5656
public class ConcurrentMessageListenerContainer<K, V> extends AbstractMessageListenerContainer<K, V> {
5757

58-
private final ConsumerFactory<K, V> consumerFactory;
59-
6058
private final List<KafkaMessageListenerContainer<K, V>> containers = new ArrayList<>();
6159

6260
private int concurrency = 1;
@@ -70,9 +68,8 @@ public class ConcurrentMessageListenerContainer<K, V> extends AbstractMessageLis
7068
*/
7169
public ConcurrentMessageListenerContainer(ConsumerFactory<K, V> consumerFactory,
7270
ContainerProperties containerProperties) {
73-
super(containerProperties);
71+
super(consumerFactory, containerProperties);
7472
Assert.notNull(consumerFactory, "A ConsumerFactory must be provided");
75-
this.consumerFactory = consumerFactory;
7673
}
7774

7875
public int getConcurrency() {

spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -102,8 +102,6 @@ public class KafkaMessageListenerContainer<K, V> extends AbstractMessageListener
102102

103103
private final AbstractMessageListenerContainer<K, V> container;
104104

105-
private final ConsumerFactory<K, V> consumerFactory;
106-
107105
private final TopicPartitionInitialOffset[] topicPartitions;
108106

109107
private volatile ListenerConsumer listenerConsumer;
@@ -159,10 +157,9 @@ public KafkaMessageListenerContainer(ConsumerFactory<K, V> consumerFactory,
159157
KafkaMessageListenerContainer(AbstractMessageListenerContainer<K, V> container,
160158
ConsumerFactory<K, V> consumerFactory,
161159
ContainerProperties containerProperties, TopicPartitionInitialOffset... topicPartitions) {
162-
super(containerProperties);
160+
super(consumerFactory, containerProperties);
163161
Assert.notNull(consumerFactory, "A ConsumerFactory must be provided");
164162
this.container = container == null ? this : container;
165-
this.consumerFactory = consumerFactory;
166163
if (topicPartitions != null) {
167164
this.topicPartitions = Arrays.copyOf(topicPartitions, topicPartitions.length);
168165
}

spring-kafka/src/test/java/org/springframework/kafka/listener/ConcurrentMessageListenerContainerTests.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@
1919
import static org.assertj.core.api.Assertions.assertThat;
2020
import static org.mockito.ArgumentMatchers.anyLong;
2121
import static org.mockito.ArgumentMatchers.anyString;
22-
import static org.mockito.ArgumentMatchers.isNull;
2322
import static org.mockito.BDDMockito.given;
2423
import static org.mockito.Mockito.mock;
2524

@@ -395,7 +394,7 @@ public void testConcurrencyWithPartitions() {
395394
};
396395
ConsumerFactory<Integer, String> cf = mock(ConsumerFactory.class);
397396
Consumer<Integer, String> consumer = mock(Consumer.class);
398-
given(cf.createConsumer(isNull(), anyString(), anyString())).willReturn(consumer);
397+
given(cf.createConsumer(anyString(), anyString(), anyString())).willReturn(consumer);
399398
given(consumer.poll(anyLong()))
400399
.willAnswer(new Answer<ConsumerRecords<Integer, String>>() {
401400

@@ -407,6 +406,7 @@ public ConsumerRecords<Integer, String> answer(InvocationOnMock invocation) thro
407406

408407
});
409408
ContainerProperties containerProps = new ContainerProperties(topic1PartitionS);
409+
containerProps.setGroupId("grp");
410410
containerProps.setMessageListener((MessageListener<Integer, String>) message -> { });
411411

412412
ConcurrentMessageListenerContainer<Integer, String> container =

spring-kafka/src/test/java/org/springframework/kafka/listener/KafkaMessageListenerContainerTests.java

Lines changed: 12 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -499,7 +499,7 @@ public void testRecordAck() throws Exception {
499499
public void testRecordAckMock() throws Exception {
500500
ConsumerFactory<Integer, String> cf = mock(ConsumerFactory.class);
501501
Consumer<Integer, String> consumer = mock(Consumer.class);
502-
given(cf.createConsumer(isNull(), eq("clientId"), isNull())).willReturn(consumer);
502+
given(cf.createConsumer(eq("grp"), eq("clientId"), isNull())).willReturn(consumer);
503503
final Map<TopicPartition, List<ConsumerRecord<Integer, String>>> records = new HashMap<>();
504504
records.put(new TopicPartition("foo", 0), Arrays.asList(
505505
new ConsumerRecord<>("foo", 0, 0L, 1, "foo"),
@@ -512,6 +512,7 @@ public void testRecordAckMock() throws Exception {
512512
TopicPartitionInitialOffset[] topicPartition = new TopicPartitionInitialOffset[] {
513513
new TopicPartitionInitialOffset("foo", 0) };
514514
ContainerProperties containerProps = new ContainerProperties(topicPartition);
515+
containerProps.setGroupId("grp");
515516
containerProps.setAckMode(AckMode.RECORD);
516517
final CountDownLatch latch = new CountDownLatch(2);
517518
MessageListener<Integer, String> messageListener = spy(
@@ -565,7 +566,7 @@ public void testRecordAckMockForeignThreadImmediate() throws Exception {
565566
private void testRecordAckMockForeignThreadGuts(AckMode ackMode) throws Exception {
566567
ConsumerFactory<Integer, String> cf = mock(ConsumerFactory.class);
567568
Consumer<Integer, String> consumer = mock(Consumer.class);
568-
given(cf.createConsumer(isNull(), eq("clientId"), isNull())).willReturn(consumer);
569+
given(cf.createConsumer(eq("grp"), eq("clientId"), isNull())).willReturn(consumer);
569570
final Map<TopicPartition, List<ConsumerRecord<Integer, String>>> records = new HashMap<>();
570571
records.put(new TopicPartition("foo", 0), Arrays.asList(
571572
new ConsumerRecord<>("foo", 0, 0L, 1, "foo"),
@@ -578,6 +579,7 @@ private void testRecordAckMockForeignThreadGuts(AckMode ackMode) throws Exceptio
578579
TopicPartitionInitialOffset[] topicPartition = new TopicPartitionInitialOffset[] {
579580
new TopicPartitionInitialOffset("foo", 0) };
580581
ContainerProperties containerProps = new ContainerProperties(topicPartition);
582+
containerProps.setGroupId("grp");
581583
containerProps.setAckMode(ackMode);
582584
final CountDownLatch latch = new CountDownLatch(2);
583585
final List<Acknowledgment> acks = new ArrayList<>();
@@ -627,7 +629,7 @@ public void onMessage(ConsumerRecord<Integer, String> data, Acknowledgment ackno
627629
public void testNonResponsiveConsumerEvent() throws Exception {
628630
ConsumerFactory<Integer, String> cf = mock(ConsumerFactory.class);
629631
Consumer<Integer, String> consumer = mock(Consumer.class);
630-
given(cf.createConsumer(isNull(), eq(""), isNull())).willReturn(consumer);
632+
given(cf.createConsumer(eq("grp"), eq(""), isNull())).willReturn(consumer);
631633
final Map<TopicPartition, List<ConsumerRecord<Integer, String>>> records = new HashMap<>();
632634
records.put(new TopicPartition("foo", 0), Arrays.asList(
633635
new ConsumerRecord<>("foo", 0, 0L, 1, "foo"),
@@ -644,6 +646,7 @@ public void testNonResponsiveConsumerEvent() throws Exception {
644646
TopicPartitionInitialOffset[] topicPartition = new TopicPartitionInitialOffset[] {
645647
new TopicPartitionInitialOffset("foo", 0) };
646648
ContainerProperties containerProps = new ContainerProperties(topicPartition);
649+
containerProps.setGroupId("grp");
647650
containerProps.setNoPollThreshold(2.0f);
648651
containerProps.setPollTimeout(10);
649652
containerProps.setMonitorInterval(1);
@@ -1703,7 +1706,7 @@ public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
17031706
public void testPauseResume() throws Exception {
17041707
ConsumerFactory<Integer, String> cf = mock(ConsumerFactory.class);
17051708
Consumer<Integer, String> consumer = mock(Consumer.class);
1706-
given(cf.createConsumer(isNull(), eq("clientId"), isNull())).willReturn(consumer);
1709+
given(cf.createConsumer(eq("grp"), eq("clientId"), isNull())).willReturn(consumer);
17071710
final Map<TopicPartition, List<ConsumerRecord<Integer, String>>> records = new HashMap<>();
17081711
records.put(new TopicPartition("foo", 0), Arrays.asList(
17091712
new ConsumerRecord<>("foo", 0, 0L, 1, "foo"),
@@ -1735,6 +1738,7 @@ public void testPauseResume() throws Exception {
17351738
TopicPartitionInitialOffset[] topicPartition = new TopicPartitionInitialOffset[] {
17361739
new TopicPartitionInitialOffset("foo", 0) };
17371740
ContainerProperties containerProps = new ContainerProperties(topicPartition);
1741+
containerProps.setGroupId("grp");
17381742
containerProps.setAckMode(AckMode.RECORD);
17391743
containerProps.setClientId("clientId");
17401744
containerProps.setIdleEventInterval(100L);
@@ -1764,7 +1768,7 @@ else if (e instanceof ConsumerResumedEvent) {
17641768
public void testInitialSeek() throws Exception {
17651769
ConsumerFactory<Integer, String> cf = mock(ConsumerFactory.class);
17661770
Consumer<Integer, String> consumer = mock(Consumer.class);
1767-
given(cf.createConsumer(isNull(), eq("clientId"), isNull())).willReturn(consumer);
1771+
given(cf.createConsumer(eq("grp"), eq("clientId"), isNull())).willReturn(consumer);
17681772
ConsumerRecords<Integer, String> emptyRecords = new ConsumerRecords<>(Collections.emptyMap());
17691773
final CountDownLatch latch = new CountDownLatch(1);
17701774
given(consumer.poll(anyLong())).willAnswer(i -> {
@@ -1781,6 +1785,7 @@ public void testInitialSeek() throws Exception {
17811785
new TopicPartitionInitialOffset("foo", 5, SeekPosition.END),
17821786
};
17831787
ContainerProperties containerProps = new ContainerProperties(topicPartition);
1788+
containerProps.setGroupId("grp");
17841789
containerProps.setAckMode(AckMode.RECORD);
17851790
containerProps.setClientId("clientId");
17861791
containerProps.setMessageListener((MessageListener) r -> { });
@@ -1868,7 +1873,7 @@ public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
18681873
public void testAckModeCount() throws Exception {
18691874
ConsumerFactory<Integer, String> cf = mock(ConsumerFactory.class);
18701875
Consumer<Integer, String> consumer = mock(Consumer.class);
1871-
given(cf.createConsumer(isNull(), eq("clientId"), isNull())).willReturn(consumer);
1876+
given(cf.createConsumer(eq("grp"), eq("clientId"), isNull())).willReturn(consumer);
18721877
TopicPartition topicPartition = new TopicPartition("foo", 0);
18731878
final Map<TopicPartition, List<ConsumerRecord<Integer, String>>> records1 = new HashMap<>();
18741879
records1.put(topicPartition, Arrays.asList(
@@ -1911,6 +1916,7 @@ public void testAckModeCount() throws Exception {
19111916
TopicPartitionInitialOffset[] topicPartitionOffset = new TopicPartitionInitialOffset[] {
19121917
new TopicPartitionInitialOffset("foo", 0) };
19131918
ContainerProperties containerProps = new ContainerProperties(topicPartitionOffset);
1919+
containerProps.setGroupId("grp");
19141920
containerProps.setAckMode(AckMode.COUNT);
19151921
containerProps.setAckCount(3);
19161922
containerProps.setClientId("clientId");
Lines changed: 142 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,142 @@
1+
/*
2+
* Copyright 2018 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.kafka.listener;
18+
19+
import static org.assertj.core.api.Assertions.assertThatExceptionOfType;
20+
21+
import java.util.Collections;
22+
23+
import org.apache.kafka.clients.consumer.ConsumerConfig;
24+
import org.apache.kafka.common.serialization.StringDeserializer;
25+
import org.junit.ClassRule;
26+
import org.junit.Test;
27+
28+
import org.springframework.context.ApplicationContextException;
29+
import org.springframework.context.annotation.AnnotationConfigApplicationContext;
30+
import org.springframework.context.annotation.Bean;
31+
import org.springframework.context.annotation.Configuration;
32+
import org.springframework.kafka.annotation.EnableKafka;
33+
import org.springframework.kafka.annotation.KafkaListener;
34+
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
35+
import org.springframework.kafka.core.ConsumerFactory;
36+
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
37+
import org.springframework.kafka.listener.config.ContainerProperties;
38+
import org.springframework.kafka.support.TopicPartitionInitialOffset;
39+
import org.springframework.kafka.test.rule.KafkaEmbedded;
40+
41+
/**
42+
* @author Gary Russell
43+
* @since 2.1.5
44+
*
45+
*/
46+
public class MissingGroupIdTests {
47+
48+
@ClassRule
49+
public static KafkaEmbedded kafkaEmbedded = new KafkaEmbedded(1, true, "missing.group");
50+
51+
@Test
52+
public void testContextFailsWithKafkaListener() {
53+
assertThatExceptionOfType(ApplicationContextException.class).isThrownBy(() -> {
54+
new AnnotationConfigApplicationContext(Config1.class);
55+
})
56+
.withCauseInstanceOf(IllegalStateException.class)
57+
.withMessageContaining("No group.id found in consumer config");
58+
}
59+
60+
@Test
61+
public void testContextFailsWithSubscribedContainer() {
62+
assertThatExceptionOfType(ApplicationContextException.class).isThrownBy(() -> {
63+
new AnnotationConfigApplicationContext(Config2.class);
64+
})
65+
.withCauseInstanceOf(IllegalStateException.class)
66+
.withMessageContaining("No group.id found in consumer config");
67+
}
68+
69+
@Test
70+
public void testContextLoadsWithAssignedContainer() {
71+
new AnnotationConfigApplicationContext(Config3.class).close();
72+
}
73+
74+
@Configuration
75+
@EnableKafka
76+
public static class Config1 {
77+
78+
@Bean
79+
public ConsumerFactory<String, String> cf() {
80+
return new DefaultKafkaConsumerFactory<>(
81+
Collections.singletonMap(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
82+
kafkaEmbedded.getBrokersAsString()),
83+
new StringDeserializer(), new StringDeserializer());
84+
}
85+
86+
@Bean
87+
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
88+
ConcurrentKafkaListenerContainerFactory<String, String> factory =
89+
new ConcurrentKafkaListenerContainerFactory<>();
90+
factory.setConsumerFactory(cf());
91+
return factory;
92+
}
93+
94+
@KafkaListener(topics = "missing.group")
95+
public void listen(String in) {
96+
// no op
97+
}
98+
99+
}
100+
101+
@Configuration
102+
@EnableKafka
103+
public static class Config2 {
104+
105+
@Bean
106+
public ConsumerFactory<String, String> cf() {
107+
return new DefaultKafkaConsumerFactory<>(
108+
Collections.singletonMap(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
109+
kafkaEmbedded.getBrokersAsString()),
110+
new StringDeserializer(), new StringDeserializer());
111+
}
112+
113+
@Bean
114+
public KafkaMessageListenerContainer<String, String> container() {
115+
ContainerProperties props = new ContainerProperties("missing.group");
116+
return new KafkaMessageListenerContainer<>(cf(), props);
117+
}
118+
119+
}
120+
121+
@Configuration
122+
@EnableKafka
123+
public static class Config3 {
124+
125+
@Bean
126+
public ConsumerFactory<String, String> cf() {
127+
return new DefaultKafkaConsumerFactory<>(
128+
Collections.singletonMap(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
129+
kafkaEmbedded.getBrokersAsString()),
130+
new StringDeserializer(), new StringDeserializer());
131+
}
132+
133+
@Bean
134+
public KafkaMessageListenerContainer<String, String> container() {
135+
ContainerProperties props = new ContainerProperties(new TopicPartitionInitialOffset("missing.group", 0));
136+
props.setMessageListener((MessageListener<String, String>) r -> { });
137+
return new KafkaMessageListenerContainer<>(cf(), props);
138+
}
139+
140+
}
141+
142+
}

0 commit comments

Comments
 (0)