Skip to content

Commit 3a2ca0f

Browse files
committed
Introduce share consumer factories for Kafka Queues (Early Access)
- Preliminary set of changes to support Kafka queueus introduced via KIP-932 (Kafka Queue) for early access in Apache Kafka 4.0.0. See: https://cwiki.apache.org/confluence/display/KAFKA/KIP-932%3A+Queues+for+Kafka - Add ShareConsumerFactory interface and DefaultShareConsumerFactory implementation as the initial building blocks for supporting Kafka Queues (KIP-932) in Spring for Apache Kafka 4.0.x. This factory and the implementation provide a flexible API for creating share consumers, and are designed as the foundation for further queue integration. - Tests to verify the share consumer behavior Related to spring-projects#3875 spring-projects#3875 Signed-off-by: Soby Chacko <[email protected]>
1 parent 982b532 commit 3a2ca0f

File tree

3 files changed

+719
-0
lines changed

3 files changed

+719
-0
lines changed
Lines changed: 351 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,351 @@
1+
/*
2+
* Copyright 2025-2025 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.kafka.core;
18+
19+
import java.time.Duration;
20+
import java.util.ArrayList;
21+
import java.util.Collections;
22+
import java.util.HashMap;
23+
import java.util.Iterator;
24+
import java.util.List;
25+
import java.util.Map;
26+
import java.util.concurrent.ConcurrentHashMap;
27+
import java.util.function.Supplier;
28+
29+
import org.apache.kafka.clients.consumer.KafkaShareConsumer;
30+
import org.apache.kafka.clients.consumer.ShareConsumer;
31+
import org.apache.kafka.common.MetricName;
32+
import org.apache.kafka.common.serialization.Deserializer;
33+
import org.jspecify.annotations.Nullable;
34+
35+
import org.springframework.beans.factory.BeanNameAware;
36+
import org.springframework.util.Assert;
37+
38+
/**
39+
* The {@link ShareConsumerFactory} implementation to produce new {@link ShareConsumer} instances
40+
* for provided {@link Map} {@code configs} and optional {@link Deserializer}s on each
41+
* {@link #createShareConsumer(String, String, String)} invocation.
42+
* <p>
43+
* If you are using {@link Deserializer}s that have no-arg constructors and require no setup, then simplest to
44+
* specify {@link Deserializer} classes in the configs passed to the
45+
* {@link DefaultShareConsumerFactory} constructor.
46+
* <p>
47+
* If that is not possible, but you are using {@link Deserializer}s that may be shared between all {@link ShareConsumer}
48+
* instances (and specifically that their close() method is a no-op), then you can pass in {@link Deserializer}
49+
* instances for one or both of the key and value deserializers.
50+
* <p>
51+
* If neither of the above is true then you may provide a {@link Supplier} for one or both {@link Deserializer}s
52+
* which will be used to obtain {@link Deserializer}(s) each time a {@link ShareConsumer} is created by the factory.
53+
*
54+
* @param <K> the key type.
55+
* @param <V> the value type.
56+
*
57+
* @author Soby Chacko
58+
* @since 4.0
59+
*/
60+
public class DefaultShareConsumerFactory<K, V> extends KafkaResourceFactory
61+
implements ShareConsumerFactory<K, V>, BeanNameAware {
62+
63+
private final Map<String, Object> configs;
64+
65+
private @Nullable Supplier<@Nullable Deserializer<K>> keyDeserializerSupplier;
66+
67+
private @Nullable Supplier<@Nullable Deserializer<V>> valueDeserializerSupplier;
68+
69+
private boolean configureDeserializers = true;
70+
71+
private final List<Listener<K, V>> listeners = new ArrayList<>();
72+
73+
private String beanName = "not.managed.by.Spring";
74+
75+
/**
76+
* Construct a factory with the provided configuration.
77+
* @param configs the configuration.
78+
*/
79+
public DefaultShareConsumerFactory(Map<String, Object> configs) {
80+
this(configs, null, null);
81+
}
82+
83+
/**
84+
* Construct a factory with the provided configuration and deserializer suppliers.
85+
* When the suppliers are invoked to get an instance, the deserializers'
86+
* {@code configure()} methods will be called with the configuration map.
87+
* @param configs the configuration.
88+
* @param keyDeserializerSupplier the key {@link Deserializer} supplier function (nullable).
89+
* @param valueDeserializerSupplier the value {@link Deserializer} supplier function (nullable).
90+
*/
91+
public DefaultShareConsumerFactory(Map<String, Object> configs,
92+
@Nullable Supplier<@Nullable Deserializer<K>> keyDeserializerSupplier,
93+
@Nullable Supplier<@Nullable Deserializer<V>> valueDeserializerSupplier) {
94+
this(configs, keyDeserializerSupplier, valueDeserializerSupplier, true);
95+
}
96+
97+
/**
98+
* Construct a factory with the provided configuration and deserializers.
99+
* The deserializers' {@code configure()} methods will be called with the
100+
* configuration map unless {@code configureDeserializers} is false.
101+
* @param configs the configuration.
102+
* @param keyDeserializer the key {@link Deserializer}.
103+
* @param valueDeserializer the value {@link Deserializer}.
104+
* @param configureDeserializers false to not configure the deserializers.
105+
*/
106+
public DefaultShareConsumerFactory(Map<String, Object> configs,
107+
@Nullable Deserializer<K> keyDeserializer,
108+
@Nullable Deserializer<V> valueDeserializer, boolean configureDeserializers) {
109+
this(configs, keyDeserializer != null ? () -> keyDeserializer : null,
110+
valueDeserializer != null ? () -> valueDeserializer : null, configureDeserializers);
111+
}
112+
113+
/**
114+
* Construct a factory with the provided configuration, deserializer suppliers, and deserializer config flag.
115+
* When the suppliers are invoked to get an instance, the deserializers'
116+
* {@code configure()} methods will be called with the configuration map unless
117+
* {@code configureDeserializers} is false.
118+
* @param configs the configuration.
119+
* @param keyDeserializerSupplier the key {@link Deserializer} supplier function (nullable).
120+
* @param valueDeserializerSupplier the value {@link Deserializer} supplier function (nullable).
121+
* @param configureDeserializers whether to configure deserializers.
122+
*/
123+
public DefaultShareConsumerFactory(Map<String, Object> configs,
124+
@Nullable Supplier<@Nullable Deserializer<K>> keyDeserializerSupplier,
125+
@Nullable Supplier<@Nullable Deserializer<V>> valueDeserializerSupplier,
126+
boolean configureDeserializers) {
127+
this.configs = new ConcurrentHashMap<>(configs);
128+
this.configureDeserializers = configureDeserializers;
129+
this.keyDeserializerSupplier = keyDeserializerSupplier;
130+
this.valueDeserializerSupplier = valueDeserializerSupplier;
131+
}
132+
133+
@Override
134+
public ShareConsumer<K, V> createShareConsumer(@Nullable String groupId, @Nullable String clientIdPrefix,
135+
@Nullable String clientIdSuffix) {
136+
return createRawConsumer(groupId, clientIdPrefix, clientIdSuffix);
137+
}
138+
139+
/**
140+
* Create a {@link ShareConsumer}.
141+
* By default, this method returns an internal {@link ExtendedShareConsumer}
142+
* which is aware of provided listeners, therefore it is recommended
143+
* to extend that class if listeners are still involved for a custom {@link ShareConsumer}.
144+
* @param groupId the group id.
145+
* @param clientIdPrefix the client id prefix.
146+
* @param clientIdSuffix the client id suffix.
147+
* @return the consumer.
148+
*/
149+
protected ShareConsumer<K, V> createRawConsumer(@Nullable String groupId, @Nullable String clientIdPrefix,
150+
@Nullable String clientIdSuffix) {
151+
Map<String, Object> consumerProperties = new HashMap<>(this.configs);
152+
if (groupId != null) {
153+
consumerProperties.put("group.id", groupId);
154+
}
155+
return new ExtendedShareConsumer(consumerProperties);
156+
}
157+
158+
@Override
159+
public void setBeanName(String name) {
160+
this.beanName = name;
161+
}
162+
163+
/**
164+
* Set the key deserializer. The deserializer will be configured using the consumer
165+
* configuration, unless {@link #setConfigureDeserializers(boolean)
166+
* configureDeserializers} is false.
167+
* @param keyDeserializer the deserializer.
168+
*/
169+
public void setKeyDeserializer(@Nullable Deserializer<K> keyDeserializer) {
170+
this.keyDeserializerSupplier = () -> keyDeserializer;
171+
}
172+
173+
/**
174+
* Set the value deserializer. The deserializer will be configured using the consumer
175+
* configuration, unless {@link #setConfigureDeserializers(boolean)
176+
* configureDeserializers} is false.
177+
* @param valueDeserializer the value deserializer.
178+
*/
179+
public void setValueDeserializer(@Nullable Deserializer<V> valueDeserializer) {
180+
this.valueDeserializerSupplier = () -> valueDeserializer;
181+
}
182+
183+
@Override
184+
@Nullable
185+
public Deserializer<K> getKeyDeserializer() {
186+
return this.keyDeserializerSupplier != null ? this.keyDeserializerSupplier.get() : null;
187+
}
188+
189+
@Override
190+
@Nullable
191+
public Deserializer<V> getValueDeserializer() {
192+
return this.valueDeserializerSupplier != null ? this.valueDeserializerSupplier.get() : null;
193+
}
194+
195+
/**
196+
* Set a supplier to supply instances of the key deserializer. The deserializer will
197+
* be configured using the consumer configuration, unless
198+
* {@link #setConfigureDeserializers(boolean) configureDeserializers} is false.
199+
* @param keyDeserializerSupplier the supplier (nullable).
200+
*/
201+
public void setKeyDeserializerSupplier(@Nullable Supplier<@Nullable Deserializer<K>> keyDeserializerSupplier) {
202+
this.keyDeserializerSupplier = keyDeserializerSupplier;
203+
}
204+
205+
/**
206+
* Set a supplier to supply instances of the value deserializer. The deserializer will
207+
* be configured using the consumer configuration, unless
208+
* {@link #setConfigureDeserializers(boolean) configureDeserializers} is false.
209+
* @param valueDeserializerSupplier the supplier (nullable).
210+
*/
211+
public void setValueDeserializerSupplier(@Nullable Supplier<@Nullable Deserializer<V>> valueDeserializerSupplier) {
212+
this.valueDeserializerSupplier = valueDeserializerSupplier;
213+
}
214+
215+
/**
216+
* Set to false (default true) to prevent programmatically provided deserializers (via
217+
* constructor or setters) from being configured using the consumer configuration,
218+
* e.g. if the deserializers are already fully configured.
219+
* @param configureDeserializers false to not configure.
220+
* @see #setKeyDeserializer(Deserializer)
221+
* @see #setKeyDeserializerSupplier(Supplier)
222+
* @see #setValueDeserializer(Deserializer)
223+
* @see #setValueDeserializerSupplier(Supplier)
224+
**/
225+
public void setConfigureDeserializers(boolean configureDeserializers) {
226+
this.configureDeserializers = configureDeserializers;
227+
}
228+
229+
/**
230+
* Return whether deserializers are configured automatically.
231+
* @return true if deserializers are configured automatically
232+
*/
233+
public boolean isConfigureDeserializers() {
234+
return this.configureDeserializers;
235+
}
236+
237+
/**
238+
* Get the current list of listeners.
239+
* @return the listeners.
240+
*/
241+
@Override
242+
public List<Listener<K, V>> getListeners() {
243+
return Collections.unmodifiableList(this.listeners);
244+
}
245+
246+
/**
247+
* Add a listener.
248+
* @param listener the listener.
249+
*/
250+
@Override
251+
public void addListener(Listener<K, V> listener) {
252+
Assert.notNull(listener, "'listener' cannot be null");
253+
this.listeners.add(listener);
254+
}
255+
256+
/**
257+
* Add a listener at a specific index.
258+
* @param index the index (list position).
259+
* @param listener the listener.
260+
*/
261+
@Override
262+
public void addListener(int index, Listener<K, V> listener) {
263+
Assert.notNull(listener, "'listener' cannot be null");
264+
if (index >= this.listeners.size()) {
265+
this.listeners.add(listener);
266+
}
267+
else {
268+
this.listeners.add(index, listener);
269+
}
270+
}
271+
272+
/**
273+
* Remove a listener.
274+
* @param listener the listener.
275+
* @return true if removed.
276+
*/
277+
@Override
278+
public boolean removeListener(Listener<K, V> listener) {
279+
return this.listeners.remove(listener);
280+
}
281+
282+
@Nullable
283+
private Deserializer<K> keyDeserializer(Map<String, Object> configs) {
284+
Deserializer<K> deserializer =
285+
this.keyDeserializerSupplier != null
286+
? this.keyDeserializerSupplier.get()
287+
: null;
288+
if (deserializer != null && this.configureDeserializers) {
289+
deserializer.configure(configs, true);
290+
}
291+
return deserializer;
292+
}
293+
294+
@Nullable
295+
private Deserializer<V> valueDeserializer(Map<String, Object> configs) {
296+
Deserializer<V> deserializer =
297+
this.valueDeserializerSupplier != null
298+
? this.valueDeserializerSupplier.get()
299+
: null;
300+
if (deserializer != null && this.configureDeserializers) {
301+
deserializer.configure(configs, false);
302+
}
303+
return deserializer;
304+
}
305+
306+
@Override
307+
public Map<String, Object> getConfigurationProperties() {
308+
return Collections.unmodifiableMap(this.configs);
309+
}
310+
311+
protected class ExtendedShareConsumer extends KafkaShareConsumer<K, V> {
312+
313+
private @Nullable String idForListeners;
314+
315+
protected ExtendedShareConsumer(Map<String, Object> configProps) {
316+
super(configProps, keyDeserializer(configProps), valueDeserializer(configProps));
317+
318+
if (!DefaultShareConsumerFactory.this.listeners.isEmpty()) {
319+
Iterator<MetricName> metricIterator = metrics().keySet().iterator();
320+
String clientId = "unknown";
321+
if (metricIterator.hasNext()) {
322+
clientId = metricIterator.next().tags().get("client-id");
323+
}
324+
this.idForListeners = DefaultShareConsumerFactory.this.beanName + "." + clientId;
325+
for (Listener<K, V> listener : DefaultShareConsumerFactory.this.listeners) {
326+
listener.consumerAdded(this.idForListeners, this);
327+
}
328+
}
329+
}
330+
331+
@Override
332+
public void close() {
333+
super.close();
334+
notifyConsumerRemoved();
335+
}
336+
337+
@Override
338+
public void close(Duration timeout) {
339+
super.close(timeout);
340+
notifyConsumerRemoved();
341+
}
342+
343+
private void notifyConsumerRemoved() {
344+
for (Listener<K, V> listener : DefaultShareConsumerFactory.this.listeners) {
345+
listener.consumerRemoved(this.idForListeners, this);
346+
}
347+
}
348+
349+
}
350+
351+
}

0 commit comments

Comments
 (0)