Skip to content
This repository was archived by the owner on Mar 30, 2023. It is now read-only.

Commit 51de03b

Browse files
Wallieeartembilan
authored andcommitted
Use ConsumerProperties in KafkaMessageSource
* Preserve existing constructors * Add @deprecated in java docs
1 parent 1903849 commit 51de03b

File tree

11 files changed

+355
-85
lines changed

11 files changed

+355
-85
lines changed

src/main/java/org/springframework/integration/kafka/config/xml/KafkaInboundChannelAdapterParser.java

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
* Parser for the inbound channel adapter.
3131
*
3232
* @author Gary Russell
33+
* @author Anshul Mehra
3334
* @since 3.2
3435
*
3536
*/
@@ -44,6 +45,10 @@ protected boolean shouldGenerateIdAsFallback() {
4445
protected BeanMetadataElement parseSource(Element element, ParserContext parserContext) {
4546
BeanDefinitionBuilder builder = BeanDefinitionBuilder.genericBeanDefinition(KafkaMessageSource.class);
4647
builder.addConstructorArgReference(element.getAttribute("consumer-factory"));
48+
boolean hasConsumerProperties = StringUtils.hasText(element.getAttribute("consumer-properties"));
49+
if (hasConsumerProperties) {
50+
builder.addConstructorArgReference(element.getAttribute("consumer-properties"));
51+
}
4752
String attribute = element.getAttribute("ack-factory");
4853
if (StringUtils.hasText(attribute)) {
4954
builder.addConstructorArgReference(attribute);
@@ -52,13 +57,15 @@ protected BeanMetadataElement parseSource(Element element, ParserContext parserC
5257
if (StringUtils.hasText(attribute)) {
5358
builder.addConstructorArgValue(attribute);
5459
}
55-
builder.addConstructorArgValue(element.getAttribute("topics"));
56-
IntegrationNamespaceUtils.setValueIfAttributeDefined(builder, element, "client-id");
57-
IntegrationNamespaceUtils.setValueIfAttributeDefined(builder, element, "group-id");
60+
if (!hasConsumerProperties) {
61+
builder.addConstructorArgValue(element.getAttribute("topics"));
62+
IntegrationNamespaceUtils.setValueIfAttributeDefined(builder, element, "client-id");
63+
IntegrationNamespaceUtils.setValueIfAttributeDefined(builder, element, "group-id");
64+
IntegrationNamespaceUtils.setReferenceIfAttributeDefined(builder, element, "rebalance-listener");
65+
}
5866
IntegrationNamespaceUtils.setReferenceIfAttributeDefined(builder, element, "message-converter");
5967
IntegrationNamespaceUtils.setValueIfAttributeDefined(builder, element, "payload-type");
6068
IntegrationNamespaceUtils.setValueIfAttributeDefined(builder, element, "raw-header", "rawMessageHeader");
61-
IntegrationNamespaceUtils.setReferenceIfAttributeDefined(builder, element, "rebalance-listener");
6269
return builder.getBeanDefinition();
6370
}
6471

src/main/java/org/springframework/integration/kafka/dsl/Kafka.java

Lines changed: 81 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
import org.springframework.kafka.core.KafkaTemplate;
2828
import org.springframework.kafka.core.ProducerFactory;
2929
import org.springframework.kafka.listener.AbstractMessageListenerContainer;
30+
import org.springframework.kafka.listener.ConsumerProperties;
3031
import org.springframework.kafka.listener.ContainerProperties;
3132
import org.springframework.kafka.listener.GenericMessageListenerContainer;
3233
import org.springframework.kafka.requestreply.ReplyingKafkaTemplate;
@@ -38,6 +39,7 @@
3839
* @author Artem Bilan
3940
* @author Nasko Vasilev
4041
* @author Gary Russell
42+
* @author Anshul Mehra
4143
*
4244
* @since 3.0
4345
*/
@@ -79,67 +81,133 @@ public static <K, V> KafkaProducerMessageHandlerSpec.KafkaProducerMessageHandler
7981
* @param <V> the Kafka message value type.
8082
* @return the spec.
8183
* @since 3.0.1
84+
* @deprecated in favor of {@link #inboundChannelAdapter(ConsumerFactory, ConsumerProperties)}
8285
*/
86+
@Deprecated
8387
public static <K, V> KafkaInboundChannelAdapterSpec<K, V> inboundChannelAdapter(
8488
ConsumerFactory<K, V> consumerFactory, String... topics) {
8589

86-
return inboundChannelAdapter(consumerFactory, false, topics);
90+
return inboundChannelAdapter(consumerFactory, new ConsumerProperties(topics), false);
8791
}
8892

8993
/**
9094
* Create an initial {@link KafkaInboundChannelAdapterSpec} with the consumer factory and
91-
* topics.
95+
* topics with a custom ack callback factory.
9296
* @param consumerFactory the consumer factory.
93-
* @param allowMultiFetch true to fetch multiple records on each poll.
97+
* @param ackCallbackFactory the callback factory.
9498
* @param topics the topic(s).
9599
* @param <K> the Kafka message key type.
96100
* @param <V> the Kafka message value type.
97101
* @return the spec.
98-
* @since 3.2
102+
* @since 3.0.1
103+
* @deprecated in favor of
104+
* {@link #inboundChannelAdapter(ConsumerFactory, ConsumerProperties, KafkaAckCallbackFactory)}
99105
*/
106+
@Deprecated
100107
public static <K, V> KafkaInboundChannelAdapterSpec<K, V> inboundChannelAdapter(
101-
ConsumerFactory<K, V> consumerFactory, boolean allowMultiFetch, String... topics) {
108+
ConsumerFactory<K, V> consumerFactory,
109+
KafkaAckCallbackFactory<K, V> ackCallbackFactory, String... topics) {
102110

103-
return new KafkaInboundChannelAdapterSpec<>(consumerFactory, allowMultiFetch, topics);
111+
return inboundChannelAdapter(consumerFactory, ackCallbackFactory, false, topics);
104112
}
105113

106114
/**
107115
* Create an initial {@link KafkaInboundChannelAdapterSpec} with the consumer factory and
108116
* topics with a custom ack callback factory.
109117
* @param consumerFactory the consumer factory.
110118
* @param ackCallbackFactory the callback factory.
119+
* @param allowMultiFetch true to fetch multiple records on each poll.
111120
* @param topics the topic(s).
112121
* @param <K> the Kafka message key type.
113122
* @param <V> the Kafka message value type.
114123
* @return the spec.
115124
* @since 3.0.1
125+
* @deprecated in favor of
126+
* {@link #inboundChannelAdapter(ConsumerFactory, ConsumerProperties, KafkaAckCallbackFactory, boolean)}
116127
*/
128+
@Deprecated
117129
public static <K, V> KafkaInboundChannelAdapterSpec<K, V> inboundChannelAdapter(
118130
ConsumerFactory<K, V> consumerFactory,
119-
KafkaAckCallbackFactory<K, V> ackCallbackFactory, String... topics) {
131+
KafkaAckCallbackFactory<K, V> ackCallbackFactory,
132+
boolean allowMultiFetch,
133+
String... topics) {
120134

121-
return inboundChannelAdapter(consumerFactory, ackCallbackFactory, false, topics);
135+
return new KafkaInboundChannelAdapterSpec<>(consumerFactory, new ConsumerProperties(topics), ackCallbackFactory, allowMultiFetch);
136+
}
137+
138+
/**
139+
* Create an initial {@link KafkaInboundChannelAdapterSpec} with the consumer factory and
140+
* topics.
141+
* @param consumerFactory the consumer factory.
142+
* @param consumerProperties the consumerProperties.
143+
* @param <K> the Kafka message key type.
144+
* @param <V> the Kafka message value type.
145+
* @return the spec.
146+
* @since 3.2
147+
*/
148+
public static <K, V> KafkaInboundChannelAdapterSpec<K, V> inboundChannelAdapter(
149+
ConsumerFactory<K, V> consumerFactory, ConsumerProperties consumerProperties) {
150+
151+
return inboundChannelAdapter(consumerFactory, consumerProperties, false);
152+
}
153+
154+
/**
155+
* Create an initial {@link KafkaInboundChannelAdapterSpec} with the consumer factory and
156+
* topics.
157+
* @param consumerFactory the consumer factory.
158+
* @param consumerProperties the consumerProperties.
159+
* @param allowMultiFetch true to fetch multiple records on each poll.
160+
* @param <K> the Kafka message key type.
161+
* @param <V> the Kafka message value type.
162+
* @return the spec.
163+
* @since 3.2
164+
*/
165+
public static <K, V> KafkaInboundChannelAdapterSpec<K, V> inboundChannelAdapter(
166+
ConsumerFactory<K, V> consumerFactory,
167+
ConsumerProperties consumerProperties,
168+
boolean allowMultiFetch) {
169+
170+
return new KafkaInboundChannelAdapterSpec<>(consumerFactory, consumerProperties, allowMultiFetch);
171+
}
172+
173+
/**
174+
* Create an initial {@link KafkaInboundChannelAdapterSpec} with the consumer factory and
175+
* topics with a custom ack callback factory.
176+
* @param consumerFactory the consumer factory.
177+
* @param consumerProperties the consumerProperties.
178+
* @param ackCallbackFactory the callback factory.
179+
* @param <K> the Kafka message key type.
180+
* @param <V> the Kafka message value type.
181+
* @return the spec.
182+
* @since 3.2
183+
*/
184+
public static <K, V> KafkaInboundChannelAdapterSpec<K, V> inboundChannelAdapter(
185+
ConsumerFactory<K, V> consumerFactory,
186+
ConsumerProperties consumerProperties,
187+
KafkaAckCallbackFactory<K, V> ackCallbackFactory) {
188+
189+
return inboundChannelAdapter(consumerFactory, consumerProperties, ackCallbackFactory, false);
122190
}
123191

124192
/**
125193
* Create an initial {@link KafkaInboundChannelAdapterSpec} with the consumer factory and
126194
* topics with a custom ack callback factory.
127195
* @param consumerFactory the consumer factory.
196+
* @param consumerProperties the consumerProperties.
128197
* @param ackCallbackFactory the callback factory.
129198
* @param allowMultiFetch true to fetch multiple records on each poll.
130-
* @param topics the topic(s).
131199
* @param <K> the Kafka message key type.
132200
* @param <V> the Kafka message value type.
133201
* @return the spec.
134-
* @since 3.0.1
202+
* @since 3.2
135203
*/
136204
public static <K, V> KafkaInboundChannelAdapterSpec<K, V> inboundChannelAdapter(
137205
ConsumerFactory<K, V> consumerFactory,
206+
ConsumerProperties consumerProperties,
138207
KafkaAckCallbackFactory<K, V> ackCallbackFactory,
139-
boolean allowMultiFetch,
140-
String... topics) {
208+
boolean allowMultiFetch) {
141209

142-
return new KafkaInboundChannelAdapterSpec<>(consumerFactory, ackCallbackFactory, allowMultiFetch, topics);
210+
return new KafkaInboundChannelAdapterSpec<>(consumerFactory, consumerProperties, ackCallbackFactory, allowMultiFetch);
143211
}
144212

145213
/**

src/main/java/org/springframework/integration/kafka/dsl/KafkaInboundChannelAdapterSpec.java

Lines changed: 109 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,11 +17,16 @@
1717
package org.springframework.integration.kafka.dsl;
1818

1919
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
20+
import org.apache.kafka.clients.consumer.ConsumerRecord;
2021

22+
import org.springframework.integration.IntegrationMessageHeaderAccessor;
2123
import org.springframework.integration.dsl.MessageSourceSpec;
2224
import org.springframework.integration.kafka.inbound.KafkaMessageSource;
2325
import org.springframework.integration.kafka.inbound.KafkaMessageSource.KafkaAckCallbackFactory;
2426
import org.springframework.kafka.core.ConsumerFactory;
27+
import org.springframework.kafka.listener.ConsumerProperties;
28+
import org.springframework.kafka.support.KafkaHeaders;
29+
import org.springframework.kafka.support.converter.MessagingMessageConverter;
2530
import org.springframework.kafka.support.converter.RecordMessageConverter;
2631

2732
/**
@@ -31,53 +36,155 @@
3136
* @param <V> the value type.
3237
*
3338
* @author Gary Russell
39+
* @author Anshul Mehra
3440
*
3541
* @since 3.0.1
3642
*
3743
*/
3844
public class KafkaInboundChannelAdapterSpec<K, V>
3945
extends MessageSourceSpec<KafkaInboundChannelAdapterSpec<K, V>, KafkaMessageSource<K, V>> {
4046

47+
/**
48+
* Create an initial {@link KafkaMessageSource} with the consumer factory and
49+
* topics.
50+
* @param consumerFactory the consumer factory.
51+
* @param allowMultiFetch true to allow {@code max.poll.records > 1}.
52+
* @param topics the topics.
53+
* @deprecated in favor of
54+
* {@link #KafkaInboundChannelAdapterSpec(ConsumerFactory, ConsumerProperties, boolean)}
55+
*/
56+
@Deprecated
4157
KafkaInboundChannelAdapterSpec(ConsumerFactory<K, V> consumerFactory, boolean allowMultiFetch, String... topics) {
42-
this.target = new KafkaMessageSource<>(consumerFactory, allowMultiFetch, topics);
58+
this.target = new KafkaMessageSource<>(consumerFactory, new ConsumerProperties(topics), allowMultiFetch);
4359
}
4460

61+
/**
62+
* Create an initial {@link KafkaMessageSource} with the consumer factory and
63+
* topics with a custom ack callback factory.
64+
* @param consumerFactory the consumer factory.
65+
* @param ackCallbackFactory the callback factory.
66+
* @param allowMultiFetch true to allow {@code max.poll.records > 1}.
67+
* @param topics the topics.
68+
* @deprecated in favor of
69+
* {@link #KafkaInboundChannelAdapterSpec(ConsumerFactory, ConsumerProperties,
70+
* KafkaAckCallbackFactory, boolean)}
71+
*/
72+
@Deprecated
4573
KafkaInboundChannelAdapterSpec(ConsumerFactory<K, V> consumerFactory,
4674
KafkaAckCallbackFactory<K, V> ackCallbackFactory, boolean allowMultiFetch, String... topics) {
4775

48-
this.target = new KafkaMessageSource<>(consumerFactory, ackCallbackFactory, allowMultiFetch, topics);
76+
this.target = new KafkaMessageSource<>(consumerFactory, new ConsumerProperties(topics), ackCallbackFactory, allowMultiFetch);
4977
}
5078

79+
/**
80+
* Create an initial {@link KafkaMessageSource} with the consumer factory and
81+
* topics with a custom ack callback factory.
82+
* @param consumerFactory the consumer factory.
83+
* @param consumerProperties the consumer properties.
84+
* @param allowMultiFetch true to allow {@code max.poll.records > 1}.
85+
*/
86+
KafkaInboundChannelAdapterSpec(ConsumerFactory<K, V> consumerFactory,
87+
ConsumerProperties consumerProperties, boolean allowMultiFetch) {
88+
this.target = new KafkaMessageSource<>(consumerFactory, consumerProperties, allowMultiFetch);
89+
}
90+
91+
/**
92+
* Create an initial {@link KafkaMessageSource} with the consumer factory and
93+
* topics with a custom ack callback factory.
94+
* @param consumerFactory the consumer factory.
95+
* @param consumerProperties the consumer properties.
96+
* @param ackCallbackFactory the callback factory.
97+
* @param allowMultiFetch true to allow {@code max.poll.records > 1}.
98+
*/
99+
KafkaInboundChannelAdapterSpec(ConsumerFactory<K, V> consumerFactory,
100+
ConsumerProperties consumerProperties,
101+
KafkaAckCallbackFactory<K, V> ackCallbackFactory, boolean allowMultiFetch) {
102+
103+
this.target = new KafkaMessageSource<>(consumerFactory, consumerProperties, ackCallbackFactory, allowMultiFetch);
104+
}
105+
106+
/**
107+
* Set the group.id property for the consumer.
108+
* @param groupId the group id.
109+
* @return the spec.
110+
* @see ConsumerProperties
111+
* @deprecated in favor of using {@link ConsumerProperties}
112+
*/
113+
@Deprecated
51114
public KafkaInboundChannelAdapterSpec<K, V> groupId(String groupId) {
52115
this.target.setGroupId(groupId);
53116
return this;
54117
}
55118

119+
/**
120+
* Set the client.id property for the consumer.
121+
* @param clientId the client id.
122+
* @return the spec.
123+
* @see ConsumerProperties
124+
* @deprecated in favor of using {@link ConsumerProperties}
125+
*/
126+
@Deprecated
56127
public KafkaInboundChannelAdapterSpec<K, V> clientId(String clientId) {
57128
this.target.setClientId(clientId);
58129
return this;
59130
}
60131

132+
/**
133+
* Set the pollTimeout for the poll() operations.
134+
* @param pollTimeout the poll timeout.
135+
* @return the spec.
136+
* @see ConsumerProperties
137+
* @deprecated in favor of using {@link ConsumerProperties}
138+
*/
139+
@Deprecated
61140
public KafkaInboundChannelAdapterSpec<K, V> pollTimeout(long pollTimeout) {
62141
this.target.setPollTimeout(pollTimeout);
63142
return this;
64143
}
65144

145+
/**
146+
* Set the message converter to replace the default.
147+
* {@link MessagingMessageConverter}.
148+
* @param messageConverter the converter.
149+
* @return the spec.
150+
*/
66151
public KafkaInboundChannelAdapterSpec<K, V> messageConverter(RecordMessageConverter messageConverter) {
67152
this.target.setMessageConverter(messageConverter);
68153
return this;
69154
}
70155

156+
/**
157+
* Set the payload type.
158+
* Only applies if a type-aware message converter is provided.
159+
* @param type the type to convert to.
160+
* @return the spec.
161+
*/
71162
public KafkaInboundChannelAdapterSpec<K, V> payloadType(Class<?> type) {
72163
this.target.setPayloadType(type);
73164
return this;
74165
}
75166

167+
/**
168+
* Set a rebalance listener.
169+
* @param rebalanceListener the rebalance listener.
170+
* @return the spec.
171+
* @see ConsumerProperties
172+
* @deprecated in favor of using {@link ConsumerProperties}
173+
*/
174+
@Deprecated
76175
public KafkaInboundChannelAdapterSpec<K, V> rebalanceListener(ConsumerRebalanceListener rebalanceListener) {
77176
this.target.setRebalanceListener(rebalanceListener);
78177
return this;
79178
}
80179

180+
/**
181+
* Set to true to include the raw {@link ConsumerRecord} as headers with keys
182+
* {@link KafkaHeaders#RAW_DATA} and
183+
* {@link IntegrationMessageHeaderAccessor#SOURCE_DATA}. enabling callers to have
184+
* access to the record to process errors.
185+
* @param rawMessageHeader true to include the header.
186+
* @return the spec.
187+
*/
81188
public KafkaInboundChannelAdapterSpec<K, V> rawMessageHeader(boolean rawMessageHeader) {
82189
this.target.setRawMessageHeader(rawMessageHeader);
83190
return this;

0 commit comments

Comments
 (0)