diff --git a/spring-kafka/src/main/java/org/springframework/kafka/core/DefaultShareConsumerFactory.java b/spring-kafka/src/main/java/org/springframework/kafka/core/DefaultShareConsumerFactory.java new file mode 100644 index 0000000000..8f8c537377 --- /dev/null +++ b/spring-kafka/src/main/java/org/springframework/kafka/core/DefaultShareConsumerFactory.java @@ -0,0 +1,355 @@ +/* + * Copyright 2025-2025 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.kafka.core; + +import java.time.Duration; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.function.Supplier; + +import org.apache.kafka.clients.consumer.KafkaShareConsumer; +import org.apache.kafka.clients.consumer.ShareConsumer; +import org.apache.kafka.common.MetricName; +import org.apache.kafka.common.serialization.Deserializer; +import org.jspecify.annotations.Nullable; + +import org.springframework.beans.factory.BeanNameAware; +import org.springframework.util.Assert; + +/** + * The {@link ShareConsumerFactory} implementation to produce new {@link ShareConsumer} instances + * for provided {@link Map} {@code configs} and optional {@link Deserializer}s on each + * {@link #createShareConsumer(String, String)} invocation. + *

+ * If you are using {@link Deserializer}s that have no-arg constructors and require no setup, then simplest to + * specify {@link Deserializer} classes in the configs passed to the + * {@link DefaultShareConsumerFactory} constructor. + *

+ * If that is not possible, but you are using {@link Deserializer}s that may be shared between all {@link ShareConsumer} + * instances (and specifically that their close() method is a no-op), then you can pass in {@link Deserializer} + * instances for one or both of the key and value deserializers. + *

+ * If neither of the above is true then you may provide a {@link Supplier} for one or both {@link Deserializer}s + * which will be used to obtain {@link Deserializer}(s) each time a {@link ShareConsumer} is created by the factory. + * + * @param the key type. + * @param the value type. + * + * @author Soby Chacko + * @since 4.0 + */ +public class DefaultShareConsumerFactory extends KafkaResourceFactory + implements ShareConsumerFactory, BeanNameAware { + + private final Map configs; + + private @Nullable Supplier<@Nullable Deserializer> keyDeserializerSupplier; + + private @Nullable Supplier<@Nullable Deserializer> valueDeserializerSupplier; + + private boolean configureDeserializers = true; + + private final List> listeners = new ArrayList<>(); + + private String beanName = "not.managed.by.Spring"; + + /** + * Construct a factory with the provided configuration. + * @param configs the configuration. + */ + public DefaultShareConsumerFactory(Map configs) { + this(configs, null, null); + } + + /** + * Construct a factory with the provided configuration and deserializer suppliers. + * When the suppliers are invoked to get an instance, the deserializers' + * {@code configure()} methods will be called with the configuration map. + * @param configs the configuration. + * @param keyDeserializerSupplier the key {@link Deserializer} supplier function (nullable). + * @param valueDeserializerSupplier the value {@link Deserializer} supplier function (nullable). + */ + public DefaultShareConsumerFactory(Map configs, + @Nullable Supplier<@Nullable Deserializer> keyDeserializerSupplier, + @Nullable Supplier<@Nullable Deserializer> valueDeserializerSupplier) { + this(configs, keyDeserializerSupplier, valueDeserializerSupplier, true); + } + + /** + * Construct a factory with the provided configuration and deserializers. + * The deserializers' {@code configure()} methods will be called with the + * configuration map unless {@code configureDeserializers} is false. + * @param configs the configuration. + * @param keyDeserializer the key {@link Deserializer}. + * @param valueDeserializer the value {@link Deserializer}. + * @param configureDeserializers false to not configure the deserializers. + */ + public DefaultShareConsumerFactory(Map configs, + @Nullable Deserializer keyDeserializer, + @Nullable Deserializer valueDeserializer, boolean configureDeserializers) { + this(configs, keyDeserializer != null ? () -> keyDeserializer : null, + valueDeserializer != null ? () -> valueDeserializer : null, configureDeserializers); + } + + /** + * Construct a factory with the provided configuration, deserializer suppliers, and deserializer config flag. + * When the suppliers are invoked to get an instance, the deserializers' + * {@code configure()} methods will be called with the configuration map unless + * {@code configureDeserializers} is false. + * @param configs the configuration. + * @param keyDeserializerSupplier the key {@link Deserializer} supplier function (nullable). + * @param valueDeserializerSupplier the value {@link Deserializer} supplier function (nullable). + * @param configureDeserializers whether to configure deserializers. + */ + public DefaultShareConsumerFactory(Map configs, + @Nullable Supplier<@Nullable Deserializer> keyDeserializerSupplier, + @Nullable Supplier<@Nullable Deserializer> valueDeserializerSupplier, + boolean configureDeserializers) { + this.configs = new ConcurrentHashMap<>(configs); + this.configureDeserializers = configureDeserializers; + this.keyDeserializerSupplier = keyDeserializerSupplier; + this.valueDeserializerSupplier = valueDeserializerSupplier; + } + + /** + * Create a share consumer with the provided group id and client id. + * @param groupId the group id (maybe null). + * @param clientId the client id. + * @return the share consumer. + */ + @Override + public ShareConsumer createShareConsumer(@Nullable String groupId, @Nullable String clientId) { + return createRawConsumer(groupId, clientId); + } + + /** + * Actually create the consumer. + * @param groupId the group id (maybe null). + * @param clientId the client id. + * @return the share consumer. + */ + protected ShareConsumer createRawConsumer(@Nullable String groupId, @Nullable String clientId) { + Map consumerProperties = new HashMap<>(this.configs); + if (groupId != null) { + consumerProperties.put("group.id", groupId); + } + if (clientId != null) { + consumerProperties.put("client.id", clientId); + } + return new ExtendedShareConsumer(consumerProperties); + } + + @Override + public void setBeanName(String name) { + this.beanName = name; + } + + /** + * Set the key deserializer. The deserializer will be configured using the consumer + * configuration, unless {@link #setConfigureDeserializers(boolean) + * configureDeserializers} is false. + * @param keyDeserializer the deserializer. + */ + public void setKeyDeserializer(@Nullable Deserializer keyDeserializer) { + this.keyDeserializerSupplier = () -> keyDeserializer; + } + + /** + * Set the value deserializer. The deserializer will be configured using the consumer + * configuration, unless {@link #setConfigureDeserializers(boolean) + * configureDeserializers} is false. + * @param valueDeserializer the value deserializer. + */ + public void setValueDeserializer(@Nullable Deserializer valueDeserializer) { + this.valueDeserializerSupplier = () -> valueDeserializer; + } + + @Override + @Nullable + public Deserializer getKeyDeserializer() { + return this.keyDeserializerSupplier != null ? this.keyDeserializerSupplier.get() : null; + } + + @Override + @Nullable + public Deserializer getValueDeserializer() { + return this.valueDeserializerSupplier != null ? this.valueDeserializerSupplier.get() : null; + } + + /** + * Set a supplier to supply instances of the key deserializer. The deserializer will + * be configured using the consumer configuration, unless + * {@link #setConfigureDeserializers(boolean) configureDeserializers} is false. + * @param keyDeserializerSupplier the supplier (nullable). + */ + public void setKeyDeserializerSupplier(@Nullable Supplier<@Nullable Deserializer> keyDeserializerSupplier) { + this.keyDeserializerSupplier = keyDeserializerSupplier; + } + + /** + * Set a supplier to supply instances of the value deserializer. The deserializer will + * be configured using the consumer configuration, unless + * {@link #setConfigureDeserializers(boolean) configureDeserializers} is false. + * @param valueDeserializerSupplier the supplier (nullable). + */ + public void setValueDeserializerSupplier(@Nullable Supplier<@Nullable Deserializer> valueDeserializerSupplier) { + this.valueDeserializerSupplier = valueDeserializerSupplier; + } + + /** + * Set to false (default true) to prevent programmatically provided deserializers (via + * constructor or setters) from being configured using the consumer configuration, + * e.g. if the deserializers are already fully configured. + * @param configureDeserializers false to not configure. + * @see #setKeyDeserializer(Deserializer) + * @see #setKeyDeserializerSupplier(Supplier) + * @see #setValueDeserializer(Deserializer) + * @see #setValueDeserializerSupplier(Supplier) + **/ + public void setConfigureDeserializers(boolean configureDeserializers) { + this.configureDeserializers = configureDeserializers; + } + + /** + * Get the current list of listeners. + * @return the listeners. + */ + @Override + public List> getListeners() { + return Collections.unmodifiableList(this.listeners); + } + + /** + * Add a listener. + * @param listener the listener. + */ + @Override + public void addListener(Listener listener) { + Assert.notNull(listener, "'listener' cannot be null"); + this.listeners.add(listener); + } + + /** + * Add a listener at a specific index. + *

+ * This method allows insertion of a listener at a particular position in the internal listener list. + * While this enables ordering of listener callbacks (which can be important for certain monitoring or extension scenarios), + * there is intentionally no corresponding {@code removeListener(int index)} contract. Removing listeners by index is + * discouraged because the position of a listener can change if others are added or removed, making it easy to + * accidentally remove the wrong one. Managing listeners by their reference (object) is safer and less error-prone, + * especially as listeners are usually set up once during initialization. + * {@see #removeListener(Listener)} + *

+ * @param index the index (list position). + * @param listener the listener to add. + */ + @Override + public void addListener(int index, Listener listener) { + Assert.notNull(listener, "'listener' cannot be null"); + if (index >= this.listeners.size()) { + this.listeners.add(listener); + } + else { + this.listeners.add(index, listener); + } + } + + /** + * Remove a listener. + * @param listener the listener. + * @return true if removed. + */ + @Override + public boolean removeListener(Listener listener) { + return this.listeners.remove(listener); + } + + @Nullable + private Deserializer keyDeserializer(Map configs) { + Deserializer deserializer = + this.keyDeserializerSupplier != null + ? this.keyDeserializerSupplier.get() + : null; + if (deserializer != null && this.configureDeserializers) { + deserializer.configure(configs, true); + } + return deserializer; + } + + @Nullable + private Deserializer valueDeserializer(Map configs) { + Deserializer deserializer = + this.valueDeserializerSupplier != null + ? this.valueDeserializerSupplier.get() + : null; + if (deserializer != null && this.configureDeserializers) { + deserializer.configure(configs, false); + } + return deserializer; + } + + @Override + public Map getConfigurationProperties() { + return Collections.unmodifiableMap(this.configs); + } + + protected class ExtendedShareConsumer extends KafkaShareConsumer { + + private @Nullable String idForListeners; + + protected ExtendedShareConsumer(Map configProps) { + super(configProps, keyDeserializer(configProps), valueDeserializer(configProps)); + + if (!DefaultShareConsumerFactory.this.listeners.isEmpty()) { + Iterator metricIterator = metrics().keySet().iterator(); + String clientId = "unknown"; + if (metricIterator.hasNext()) { + clientId = metricIterator.next().tags().get("client-id"); + } + this.idForListeners = DefaultShareConsumerFactory.this.beanName + "." + clientId; + for (Listener listener : DefaultShareConsumerFactory.this.listeners) { + listener.consumerAdded(this.idForListeners, this); + } + } + } + + @Override + public void close() { + super.close(); + notifyConsumerRemoved(); + } + + @Override + public void close(Duration timeout) { + super.close(timeout); + notifyConsumerRemoved(); + } + + private void notifyConsumerRemoved() { + for (Listener listener : DefaultShareConsumerFactory.this.listeners) { + listener.consumerRemoved(this.idForListeners, this); + } + } + + } + +} diff --git a/spring-kafka/src/main/java/org/springframework/kafka/core/ShareConsumerFactory.java b/spring-kafka/src/main/java/org/springframework/kafka/core/ShareConsumerFactory.java new file mode 100644 index 0000000000..25ee560b1f --- /dev/null +++ b/spring-kafka/src/main/java/org/springframework/kafka/core/ShareConsumerFactory.java @@ -0,0 +1,132 @@ +/* + * Copyright 2025-2025 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.kafka.core; + +import java.util.Collections; +import java.util.List; +import java.util.Map; + +import org.apache.kafka.clients.consumer.ShareConsumer; +import org.apache.kafka.common.serialization.Deserializer; +import org.jspecify.annotations.Nullable; + +/** + * The strategy to produce a {@link ShareConsumer} instance for Kafka queue support. + * + * @param the key type. + * @param the value type. + * + * @author Soby Chacko + * @since 4.0 + */ +public interface ShareConsumerFactory { + + /** + * Create a share consumer with the provided group id and client id. + * @param groupId the group id (maybe null). + * @param clientId the client id. + * @return the share consumer. + */ + ShareConsumer createShareConsumer(@Nullable String groupId, @Nullable String clientId); + + /** + * Return an unmodifiable reference to the configuration map for this factory. + * Useful for cloning to make a similar factory. + * @return the configs. + */ + default Map getConfigurationProperties() { + throw new UnsupportedOperationException("'getConfigurationProperties()' is not supported"); + } + + /** + * Return the configured key deserializer (if provided as an object instead + * of a class name in the properties). + * @return the deserializer. + */ + @Nullable + default Deserializer getKeyDeserializer() { + return null; + } + + /** + * Return the configured value deserializer (if provided as an object instead + * of a class name in the properties). + * @return the deserializer. + */ + @Nullable + default Deserializer getValueDeserializer() { + return null; + } + + /** + * Remove a listener. + * @param listener the listener. + * @return true if removed. + */ + default boolean removeListener(Listener listener) { + return false; + } + + /** + * Add a listener at a specific index. + * @param index the index (list position). + * @param listener the listener. + */ + default void addListener(int index, Listener listener) { + } + + /** + * Add a listener. + * @param listener the listener. + */ + default void addListener(Listener listener) { + } + + /** + * Get the current list of listeners. + * @return the listeners. + */ + default List> getListeners() { + return Collections.emptyList(); + } + + /** + * Listener for share consumer lifecycle events. + * + * @param the key type. + * @param the value type. + */ + interface Listener { + + /** + * A new consumer was created. + * @param id the consumer id (factory bean name and client.id separated by a period). + * @param consumer the consumer. + */ + default void consumerAdded(String id, ShareConsumer consumer) { + } + + /** + * An existing consumer was removed. + * @param id the consumer id (factory bean name and client.id separated by a period). + * @param consumer the consumer. + */ + default void consumerRemoved(@Nullable String id, ShareConsumer consumer) { + } + + } +} diff --git a/spring-kafka/src/test/java/org/springframework/kafka/core/DefaultShareConsumerFactoryTests.java b/spring-kafka/src/test/java/org/springframework/kafka/core/DefaultShareConsumerFactoryTests.java new file mode 100644 index 0000000000..b6ea2e5e09 --- /dev/null +++ b/spring-kafka/src/test/java/org/springframework/kafka/core/DefaultShareConsumerFactoryTests.java @@ -0,0 +1,200 @@ +/* + * Copyright 2025-2025 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.kafka.core; + +import java.time.Duration; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; + +import org.apache.kafka.clients.admin.Admin; +import org.apache.kafka.clients.admin.AdminClient; +import org.apache.kafka.clients.admin.AlterConfigOp; +import org.apache.kafka.clients.admin.ConfigEntry; +import org.apache.kafka.clients.consumer.ShareConsumer; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.common.config.ConfigResource; +import org.apache.kafka.common.serialization.StringDeserializer; +import org.apache.kafka.common.serialization.StringSerializer; +import org.junit.jupiter.api.Test; + +import org.springframework.kafka.test.EmbeddedKafkaBroker; +import org.springframework.kafka.test.context.EmbeddedKafka; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; + +/** + * Basic tests for {@link DefaultShareConsumerFactory}. + * + * @author Soby Chacko + * @since 4.0 + */ +@EmbeddedKafka( + topics = {"embedded-share-test"}, partitions = 1, + brokerProperties = { + "unstable.api.versions.enable=true", + "group.coordinator.rebalance.protocols=classic,share", + "share.coordinator.state.topic.replication.factor=1", + "share.coordinator.state.topic.min.isr=1" + }) +class DefaultShareConsumerFactoryTests { + + @Test + void shouldInstantiateWithConfigs() { + Map configs = new HashMap<>(); + configs.put("bootstrap.servers", "localhost:9092"); + DefaultShareConsumerFactory factory = new DefaultShareConsumerFactory<>(configs); + assertThat(factory).isNotNull(); + assertThat(factory.getConfigurationProperties()).containsKey("bootstrap.servers"); + } + + @Test + void shouldConfigureDeserializersViaSetters() { + var configs = new HashMap(); + var factory = new DefaultShareConsumerFactory(configs); + var keyDeserializer = new StringDeserializer(); + var valueDeserializer = new StringDeserializer(); + factory.setKeyDeserializer(keyDeserializer); + factory.setValueDeserializer(valueDeserializer); + assertThat(factory.getKeyDeserializer()) + .as("Key deserializer should match the set instance") + .isSameAs(keyDeserializer); + assertThat(factory.getValueDeserializer()) + .as("Value deserializer should match the set instance") + .isSameAs(valueDeserializer); + } + + @Test + void shouldConfigureDeserializersViaConstructor() { + var configs = new HashMap(); + var keyDeserializer = new StringDeserializer(); + var valueDeserializer = new StringDeserializer(); + var factory = new DefaultShareConsumerFactory<>(configs, keyDeserializer, valueDeserializer, true); + assertThat(factory.getKeyDeserializer()) + .as("Key deserializer should match the constructor instance") + .isSameAs(keyDeserializer); + assertThat(factory.getValueDeserializer()) + .as("Value deserializer should match the constructor instance") + .isSameAs(valueDeserializer); + } + + @Test + void shouldRegisterAndRemoveListeners() { + var configs = new HashMap(); + var factory = new DefaultShareConsumerFactory(configs); + var listener = new ShareConsumerFactory.Listener() { + + }; + factory.addListener(listener); + assertThat(factory.getListeners()) + .as("Listeners should contain the added listener") + .contains(listener); + factory.removeListener(listener); + assertThat(factory.getListeners()) + .as("Listeners should not contain the removed listener") + .doesNotContain(listener); + } + + @Test + void shouldCreateShareConsumer() { + Map configs = new HashMap<>(); + configs.put("bootstrap.servers", "localhost:9092"); + configs.put("key.deserializer", StringDeserializer.class); + configs.put("value.deserializer", StringDeserializer.class); + DefaultShareConsumerFactory factory = new DefaultShareConsumerFactory<>(configs); + ShareConsumer shareConsumer = factory.createShareConsumer("group", "myapp-client-id"); + assertThat(shareConsumer).isNotNull(); + } + + @Test + void shouldReturnUnmodifiableListenersList() { + var configs = new HashMap(); + var factory = new DefaultShareConsumerFactory(configs); + var listener = new ShareConsumerFactory.Listener() { + + }; + factory.addListener(listener); + var listeners = factory.getListeners(); + assertThat(listeners).contains(listener); + // Attempting to modify the returned list should throw + assertThatThrownBy(() -> listeners.add(new ShareConsumerFactory.Listener<>() { + + })) + .as("Listeners list should be unmodifiable") + .isInstanceOf(UnsupportedOperationException.class); + } + + @Test + @SuppressWarnings("try") + void integrationTestDefaultShareConsumerFactory(EmbeddedKafkaBroker broker) throws Exception { + final String topic = "embedded-share-test"; + final String groupId = "testGroup"; + var bootstrapServers = broker.getBrokersAsString(); + + var producerProps = new java.util.Properties(); + producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); + producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); + producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); + + try (var producer = new KafkaProducer(producerProps)) { + producer.send(new ProducerRecord<>(topic, "key", "integration-test-value")).get(); + } + + Map adminProperties = new HashMap<>(); + adminProperties.put("bootstrap.servers", bootstrapServers); + + // For this test: force new share groups to start from the beginning of the topic. + // This is NOT the same as the usual consumer auto.offset.reset; it's a group config, + // so use AdminClient to set share.auto.offset.reset = earliest for our test group. + try (AdminClient ignored = AdminClient.create(adminProperties)) { + ConfigEntry entry = new ConfigEntry("share.auto.offset.reset", "earliest"); + AlterConfigOp op = new AlterConfigOp(entry, AlterConfigOp.OpType.SET); + + Map> configs = Map.of( + new ConfigResource(ConfigResource.Type.GROUP, "testGroup"), Arrays.asList(op)); + + try (Admin admin = AdminClient.create(adminProperties)) { + admin.incrementalAlterConfigs(configs).all().get(); + } + } + + var consumerProps = new HashMap(); + consumerProps.put("bootstrap.servers", bootstrapServers); + consumerProps.put("key.deserializer", StringDeserializer.class); + consumerProps.put("value.deserializer", StringDeserializer.class); + consumerProps.put("group.id", groupId); + + var factory = new DefaultShareConsumerFactory(consumerProps); + var consumer = factory.createShareConsumer(groupId, "myapp-client-id"); + consumer.subscribe(Collections.singletonList(topic)); + + var records = consumer.poll(Duration.ofSeconds(10)); + assertThat(records.count()) + .as("Should have received at least one record") + .isGreaterThan(0); + assertThat(records.iterator().next().value()) + .as("Record value should match") + .isEqualTo("integration-test-value"); + consumer.close(); + } + +}