From 3a2ca0fb59a12f490e26f94b72e90b77f84549c2 Mon Sep 17 00:00:00 2001 From: Soby Chacko Date: Thu, 22 May 2025 23:05:56 -0400 Subject: [PATCH] Introduce share consumer factories for Kafka Queues (Early Access) - Preliminary set of changes to support Kafka queueus introduced via KIP-932 (Kafka Queue) for early access in Apache Kafka 4.0.0. See: https://cwiki.apache.org/confluence/display/KAFKA/KIP-932%3A+Queues+for+Kafka - Add ShareConsumerFactory interface and DefaultShareConsumerFactory implementation as the initial building blocks for supporting Kafka Queues (KIP-932) in Spring for Apache Kafka 4.0.x. This factory and the implementation provide a flexible API for creating share consumers, and are designed as the foundation for further queue integration. - Tests to verify the share consumer behavior Related to #3875 https://github.com/spring-projects/spring-kafka/issues/3875 Signed-off-by: Soby Chacko --- .../core/DefaultShareConsumerFactory.java | 351 ++++++++++++++++++ .../kafka/core/ShareConsumerFactory.java | 134 +++++++ .../DefaultShareConsumerFactoryTests.java | 234 ++++++++++++ 3 files changed, 719 insertions(+) create mode 100644 spring-kafka/src/main/java/org/springframework/kafka/core/DefaultShareConsumerFactory.java create mode 100644 spring-kafka/src/main/java/org/springframework/kafka/core/ShareConsumerFactory.java create mode 100644 spring-kafka/src/test/java/org/springframework/kafka/core/DefaultShareConsumerFactoryTests.java diff --git a/spring-kafka/src/main/java/org/springframework/kafka/core/DefaultShareConsumerFactory.java b/spring-kafka/src/main/java/org/springframework/kafka/core/DefaultShareConsumerFactory.java new file mode 100644 index 0000000000..1646464e45 --- /dev/null +++ b/spring-kafka/src/main/java/org/springframework/kafka/core/DefaultShareConsumerFactory.java @@ -0,0 +1,351 @@ +/* + * Copyright 2025-2025 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.kafka.core; + +import java.time.Duration; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.function.Supplier; + +import org.apache.kafka.clients.consumer.KafkaShareConsumer; +import org.apache.kafka.clients.consumer.ShareConsumer; +import org.apache.kafka.common.MetricName; +import org.apache.kafka.common.serialization.Deserializer; +import org.jspecify.annotations.Nullable; + +import org.springframework.beans.factory.BeanNameAware; +import org.springframework.util.Assert; + +/** + * The {@link ShareConsumerFactory} implementation to produce new {@link ShareConsumer} instances + * for provided {@link Map} {@code configs} and optional {@link Deserializer}s on each + * {@link #createShareConsumer(String, String, String)} invocation. + *

+ * If you are using {@link Deserializer}s that have no-arg constructors and require no setup, then simplest to + * specify {@link Deserializer} classes in the configs passed to the + * {@link DefaultShareConsumerFactory} constructor. + *

+ * If that is not possible, but you are using {@link Deserializer}s that may be shared between all {@link ShareConsumer} + * instances (and specifically that their close() method is a no-op), then you can pass in {@link Deserializer} + * instances for one or both of the key and value deserializers. + *

+ * If neither of the above is true then you may provide a {@link Supplier} for one or both {@link Deserializer}s + * which will be used to obtain {@link Deserializer}(s) each time a {@link ShareConsumer} is created by the factory. + * + * @param the key type. + * @param the value type. + * + * @author Soby Chacko + * @since 4.0 + */ +public class DefaultShareConsumerFactory extends KafkaResourceFactory + implements ShareConsumerFactory, BeanNameAware { + + private final Map configs; + + private @Nullable Supplier<@Nullable Deserializer> keyDeserializerSupplier; + + private @Nullable Supplier<@Nullable Deserializer> valueDeserializerSupplier; + + private boolean configureDeserializers = true; + + private final List> listeners = new ArrayList<>(); + + private String beanName = "not.managed.by.Spring"; + + /** + * Construct a factory with the provided configuration. + * @param configs the configuration. + */ + public DefaultShareConsumerFactory(Map configs) { + this(configs, null, null); + } + + /** + * Construct a factory with the provided configuration and deserializer suppliers. + * When the suppliers are invoked to get an instance, the deserializers' + * {@code configure()} methods will be called with the configuration map. + * @param configs the configuration. + * @param keyDeserializerSupplier the key {@link Deserializer} supplier function (nullable). + * @param valueDeserializerSupplier the value {@link Deserializer} supplier function (nullable). + */ + public DefaultShareConsumerFactory(Map configs, + @Nullable Supplier<@Nullable Deserializer> keyDeserializerSupplier, + @Nullable Supplier<@Nullable Deserializer> valueDeserializerSupplier) { + this(configs, keyDeserializerSupplier, valueDeserializerSupplier, true); + } + + /** + * Construct a factory with the provided configuration and deserializers. + * The deserializers' {@code configure()} methods will be called with the + * configuration map unless {@code configureDeserializers} is false. + * @param configs the configuration. + * @param keyDeserializer the key {@link Deserializer}. + * @param valueDeserializer the value {@link Deserializer}. + * @param configureDeserializers false to not configure the deserializers. + */ + public DefaultShareConsumerFactory(Map configs, + @Nullable Deserializer keyDeserializer, + @Nullable Deserializer valueDeserializer, boolean configureDeserializers) { + this(configs, keyDeserializer != null ? () -> keyDeserializer : null, + valueDeserializer != null ? () -> valueDeserializer : null, configureDeserializers); + } + + /** + * Construct a factory with the provided configuration, deserializer suppliers, and deserializer config flag. + * When the suppliers are invoked to get an instance, the deserializers' + * {@code configure()} methods will be called with the configuration map unless + * {@code configureDeserializers} is false. + * @param configs the configuration. + * @param keyDeserializerSupplier the key {@link Deserializer} supplier function (nullable). + * @param valueDeserializerSupplier the value {@link Deserializer} supplier function (nullable). + * @param configureDeserializers whether to configure deserializers. + */ + public DefaultShareConsumerFactory(Map configs, + @Nullable Supplier<@Nullable Deserializer> keyDeserializerSupplier, + @Nullable Supplier<@Nullable Deserializer> valueDeserializerSupplier, + boolean configureDeserializers) { + this.configs = new ConcurrentHashMap<>(configs); + this.configureDeserializers = configureDeserializers; + this.keyDeserializerSupplier = keyDeserializerSupplier; + this.valueDeserializerSupplier = valueDeserializerSupplier; + } + + @Override + public ShareConsumer createShareConsumer(@Nullable String groupId, @Nullable String clientIdPrefix, + @Nullable String clientIdSuffix) { + return createRawConsumer(groupId, clientIdPrefix, clientIdSuffix); + } + + /** + * Create a {@link ShareConsumer}. + * By default, this method returns an internal {@link ExtendedShareConsumer} + * which is aware of provided listeners, therefore it is recommended + * to extend that class if listeners are still involved for a custom {@link ShareConsumer}. + * @param groupId the group id. + * @param clientIdPrefix the client id prefix. + * @param clientIdSuffix the client id suffix. + * @return the consumer. + */ + protected ShareConsumer createRawConsumer(@Nullable String groupId, @Nullable String clientIdPrefix, + @Nullable String clientIdSuffix) { + Map consumerProperties = new HashMap<>(this.configs); + if (groupId != null) { + consumerProperties.put("group.id", groupId); + } + return new ExtendedShareConsumer(consumerProperties); + } + + @Override + public void setBeanName(String name) { + this.beanName = name; + } + + /** + * Set the key deserializer. The deserializer will be configured using the consumer + * configuration, unless {@link #setConfigureDeserializers(boolean) + * configureDeserializers} is false. + * @param keyDeserializer the deserializer. + */ + public void setKeyDeserializer(@Nullable Deserializer keyDeserializer) { + this.keyDeserializerSupplier = () -> keyDeserializer; + } + + /** + * Set the value deserializer. The deserializer will be configured using the consumer + * configuration, unless {@link #setConfigureDeserializers(boolean) + * configureDeserializers} is false. + * @param valueDeserializer the value deserializer. + */ + public void setValueDeserializer(@Nullable Deserializer valueDeserializer) { + this.valueDeserializerSupplier = () -> valueDeserializer; + } + + @Override + @Nullable + public Deserializer getKeyDeserializer() { + return this.keyDeserializerSupplier != null ? this.keyDeserializerSupplier.get() : null; + } + + @Override + @Nullable + public Deserializer getValueDeserializer() { + return this.valueDeserializerSupplier != null ? this.valueDeserializerSupplier.get() : null; + } + + /** + * Set a supplier to supply instances of the key deserializer. The deserializer will + * be configured using the consumer configuration, unless + * {@link #setConfigureDeserializers(boolean) configureDeserializers} is false. + * @param keyDeserializerSupplier the supplier (nullable). + */ + public void setKeyDeserializerSupplier(@Nullable Supplier<@Nullable Deserializer> keyDeserializerSupplier) { + this.keyDeserializerSupplier = keyDeserializerSupplier; + } + + /** + * Set a supplier to supply instances of the value deserializer. The deserializer will + * be configured using the consumer configuration, unless + * {@link #setConfigureDeserializers(boolean) configureDeserializers} is false. + * @param valueDeserializerSupplier the supplier (nullable). + */ + public void setValueDeserializerSupplier(@Nullable Supplier<@Nullable Deserializer> valueDeserializerSupplier) { + this.valueDeserializerSupplier = valueDeserializerSupplier; + } + + /** + * Set to false (default true) to prevent programmatically provided deserializers (via + * constructor or setters) from being configured using the consumer configuration, + * e.g. if the deserializers are already fully configured. + * @param configureDeserializers false to not configure. + * @see #setKeyDeserializer(Deserializer) + * @see #setKeyDeserializerSupplier(Supplier) + * @see #setValueDeserializer(Deserializer) + * @see #setValueDeserializerSupplier(Supplier) + **/ + public void setConfigureDeserializers(boolean configureDeserializers) { + this.configureDeserializers = configureDeserializers; + } + + /** + * Return whether deserializers are configured automatically. + * @return true if deserializers are configured automatically + */ + public boolean isConfigureDeserializers() { + return this.configureDeserializers; + } + + /** + * Get the current list of listeners. + * @return the listeners. + */ + @Override + public List> getListeners() { + return Collections.unmodifiableList(this.listeners); + } + + /** + * Add a listener. + * @param listener the listener. + */ + @Override + public void addListener(Listener listener) { + Assert.notNull(listener, "'listener' cannot be null"); + this.listeners.add(listener); + } + + /** + * Add a listener at a specific index. + * @param index the index (list position). + * @param listener the listener. + */ + @Override + public void addListener(int index, Listener listener) { + Assert.notNull(listener, "'listener' cannot be null"); + if (index >= this.listeners.size()) { + this.listeners.add(listener); + } + else { + this.listeners.add(index, listener); + } + } + + /** + * Remove a listener. + * @param listener the listener. + * @return true if removed. + */ + @Override + public boolean removeListener(Listener listener) { + return this.listeners.remove(listener); + } + + @Nullable + private Deserializer keyDeserializer(Map configs) { + Deserializer deserializer = + this.keyDeserializerSupplier != null + ? this.keyDeserializerSupplier.get() + : null; + if (deserializer != null && this.configureDeserializers) { + deserializer.configure(configs, true); + } + return deserializer; + } + + @Nullable + private Deserializer valueDeserializer(Map configs) { + Deserializer deserializer = + this.valueDeserializerSupplier != null + ? this.valueDeserializerSupplier.get() + : null; + if (deserializer != null && this.configureDeserializers) { + deserializer.configure(configs, false); + } + return deserializer; + } + + @Override + public Map getConfigurationProperties() { + return Collections.unmodifiableMap(this.configs); + } + + protected class ExtendedShareConsumer extends KafkaShareConsumer { + + private @Nullable String idForListeners; + + protected ExtendedShareConsumer(Map configProps) { + super(configProps, keyDeserializer(configProps), valueDeserializer(configProps)); + + if (!DefaultShareConsumerFactory.this.listeners.isEmpty()) { + Iterator metricIterator = metrics().keySet().iterator(); + String clientId = "unknown"; + if (metricIterator.hasNext()) { + clientId = metricIterator.next().tags().get("client-id"); + } + this.idForListeners = DefaultShareConsumerFactory.this.beanName + "." + clientId; + for (Listener listener : DefaultShareConsumerFactory.this.listeners) { + listener.consumerAdded(this.idForListeners, this); + } + } + } + + @Override + public void close() { + super.close(); + notifyConsumerRemoved(); + } + + @Override + public void close(Duration timeout) { + super.close(timeout); + notifyConsumerRemoved(); + } + + private void notifyConsumerRemoved() { + for (Listener listener : DefaultShareConsumerFactory.this.listeners) { + listener.consumerRemoved(this.idForListeners, this); + } + } + + } + +} diff --git a/spring-kafka/src/main/java/org/springframework/kafka/core/ShareConsumerFactory.java b/spring-kafka/src/main/java/org/springframework/kafka/core/ShareConsumerFactory.java new file mode 100644 index 0000000000..83a1151490 --- /dev/null +++ b/spring-kafka/src/main/java/org/springframework/kafka/core/ShareConsumerFactory.java @@ -0,0 +1,134 @@ +/* + * Copyright 2025-2025 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.kafka.core; + +import java.util.Collections; +import java.util.List; +import java.util.Map; + +import org.apache.kafka.clients.consumer.ShareConsumer; +import org.apache.kafka.common.serialization.Deserializer; +import org.jspecify.annotations.Nullable; + +/** + * The strategy to produce a {@link ShareConsumer} instance for Kafka queue support. + * + * @param the key type. + * @param the value type. + * + * @author Soby Chacko + * @since 4.0 + */ +public interface ShareConsumerFactory { + + /** + * Create a share consumer with the provided group id and client id. + * @param groupId the group id (maybe null). + * @param clientIdPrefix the client id prefix. + * @param clientIdSuffix the client id suffix. + * @return the share consumer. + */ + ShareConsumer createShareConsumer(@Nullable String groupId, @Nullable String clientIdPrefix, + @Nullable String clientIdSuffix); + + /** + * Return an unmodifiable reference to the configuration map for this factory. + * Useful for cloning to make a similar factory. + * @return the configs. + */ + default Map getConfigurationProperties() { + throw new UnsupportedOperationException("'getConfigurationProperties()' is not supported"); + } + + /** + * Return the configured key deserializer (if provided as an object instead + * of a class name in the properties). + * @return the deserializer. + */ + @Nullable + default Deserializer getKeyDeserializer() { + return null; + } + + /** + * Return the configured value deserializer (if provided as an object instead + * of a class name in the properties). + * @return the deserializer. + */ + @Nullable + default Deserializer getValueDeserializer() { + return null; + } + + /** + * Remove a listener. + * @param listener the listener. + * @return true if removed. + */ + default boolean removeListener(Listener listener) { + return false; + } + + /** + * Add a listener at a specific index. + * @param index the index (list position). + * @param listener the listener. + */ + default void addListener(int index, Listener listener) { + } + + /** + * Add a listener. + * @param listener the listener. + */ + default void addListener(Listener listener) { + } + + /** + * Get the current list of listeners. + * @return the listeners. + */ + default List> getListeners() { + return Collections.emptyList(); + } + + /** + * Listener for share consumer lifecycle events. + * + * @param the key type. + * @param the value type. + */ + interface Listener { + + /** + * A new consumer was created. + * @param id the consumer id (factory bean name and client.id separated by a period). + * @param consumer the consumer. + */ + default void consumerAdded(String id, ShareConsumer consumer) { + } + + /** + * An existing consumer was removed. + * @param id the consumer id (factory bean name and client.id separated by a period). + * @param consumer the consumer. + */ + default void consumerRemoved(@Nullable String id, ShareConsumer consumer) { + } + + } +} diff --git a/spring-kafka/src/test/java/org/springframework/kafka/core/DefaultShareConsumerFactoryTests.java b/spring-kafka/src/test/java/org/springframework/kafka/core/DefaultShareConsumerFactoryTests.java new file mode 100644 index 0000000000..7bd735cef0 --- /dev/null +++ b/spring-kafka/src/test/java/org/springframework/kafka/core/DefaultShareConsumerFactoryTests.java @@ -0,0 +1,234 @@ +/* + * Copyright 2025-2025 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.kafka.core; + +import java.time.Duration; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; + +import org.apache.kafka.clients.admin.Admin; +import org.apache.kafka.clients.admin.AdminClient; +import org.apache.kafka.clients.admin.AlterConfigOp; +import org.apache.kafka.clients.admin.ConfigEntry; +import org.apache.kafka.clients.consumer.ShareConsumer; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.common.config.ConfigResource; +import org.apache.kafka.common.serialization.StringDeserializer; +import org.apache.kafka.common.serialization.StringSerializer; +import org.junit.jupiter.api.Test; + +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.kafka.test.EmbeddedKafkaBroker; +import org.springframework.kafka.test.context.EmbeddedKafka; +import org.springframework.test.annotation.DirtiesContext; +import org.springframework.test.context.junit.jupiter.SpringJUnitConfig; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; + +/** + * Basic tests for {@link DefaultShareConsumerFactory}. + * + * @author Soby Chacko + * @since 4.0 + */ +@EmbeddedKafka( + topics = {"embedded-share-test"}, partitions = 1, + brokerProperties = { + "unstable.api.versions.enable=true", + "group.coordinator.rebalance.protocols=classic,consumer,share", + "share.coordinator.state.topic.replication.factor=1", + "share.coordinator.state.topic.min.isr=1" + }) +@SpringJUnitConfig +@DirtiesContext +class DefaultShareConsumerFactoryTests { + + @Autowired + private EmbeddedKafkaBroker embeddedKafka; + + @Test + void shouldInstantiateWithConfigs() { + Map configs = new HashMap<>(); + configs.put("bootstrap.servers", "localhost:9092"); + DefaultShareConsumerFactory factory = new DefaultShareConsumerFactory<>(configs); + assertThat(factory).isNotNull(); + assertThat(factory.getConfigurationProperties()).containsKey("bootstrap.servers"); + } + + @Test + void shouldConfigureDeserializersViaSetters() { + var configs = new HashMap(); + var factory = new DefaultShareConsumerFactory(configs); + var keyDeserializer = new StringDeserializer(); + var valueDeserializer = new StringDeserializer(); + factory.setKeyDeserializer(keyDeserializer); + factory.setValueDeserializer(valueDeserializer); + assertThat(factory.getKeyDeserializer()) + .as("Key deserializer should match the set instance") + .isSameAs(keyDeserializer); + assertThat(factory.getValueDeserializer()) + .as("Value deserializer should match the set instance") + .isSameAs(valueDeserializer); + } + + @Test + void shouldConfigureDeserializersViaConstructor() { + var configs = new HashMap(); + var keyDeserializer = new StringDeserializer(); + var valueDeserializer = new StringDeserializer(); + var factory = new DefaultShareConsumerFactory<>(configs, keyDeserializer, valueDeserializer, true); + assertThat(factory.getKeyDeserializer()) + .as("Key deserializer should match the constructor instance") + .isSameAs(keyDeserializer); + assertThat(factory.getValueDeserializer()) + .as("Value deserializer should match the constructor instance") + .isSameAs(valueDeserializer); + } + + @Test + void shouldRegisterAndRemoveListeners() { + var configs = new HashMap(); + var factory = new DefaultShareConsumerFactory(configs); + var listener = new ShareConsumerFactory.Listener() { + + }; + factory.addListener(listener); + assertThat(factory.getListeners()) + .as("Listeners should contain the added listener") + .contains(listener); + factory.removeListener(listener); + assertThat(factory.getListeners()) + .as("Listeners should not contain the removed listener") + .doesNotContain(listener); + } + + @Test + void shouldCreateShareConsumer() { + Map configs = new HashMap<>(); + configs.put("bootstrap.servers", "localhost:9092"); + configs.put("key.deserializer", StringDeserializer.class); + configs.put("value.deserializer", StringDeserializer.class); + DefaultShareConsumerFactory factory = new DefaultShareConsumerFactory<>(configs); + ShareConsumer shareConsumer = factory.createShareConsumer("group", "prefix", "suffix"); + assertThat(shareConsumer).isNotNull(); + } + + @Test + void shouldSetConfigureDeserializersFlag() { + Map configs = new HashMap<>(); + DefaultShareConsumerFactory factory = new DefaultShareConsumerFactory<>(configs); + factory.setConfigureDeserializers(false); + assertThat(factory.isConfigureDeserializers()).isFalse(); + } + + @Test + void shouldRespectConfigureDeserializersFlag() { + var configs = new HashMap(); + configs.put("bootstrap.servers", "localhost:9092"); + var keyDeserializer = new StringDeserializer(); + var valueDeserializer = new StringDeserializer(); + var factory = new DefaultShareConsumerFactory<>(configs, keyDeserializer, valueDeserializer, false); + // Should not configure deserializers from config if flag is false + assertThat(factory.isConfigureDeserializers()) + .as("configureDeserializers flag should be false") + .isFalse(); + // Setting the flag to true should update behavior + factory.setConfigureDeserializers(true); + assertThat(factory.isConfigureDeserializers()) + .as("configureDeserializers flag should be true after set") + .isTrue(); + } + + @Test + void shouldReturnUnmodifiableListenersList() { + var configs = new HashMap(); + var factory = new DefaultShareConsumerFactory(configs); + var listener = new ShareConsumerFactory.Listener() { + + }; + factory.addListener(listener); + var listeners = factory.getListeners(); + assertThat(listeners).contains(listener); + // Attempting to modify the returned list should throw + assertThatThrownBy(() -> listeners.add(new ShareConsumerFactory.Listener<>() { + + })) + .as("Listeners list should be unmodifiable") + .isInstanceOf(UnsupportedOperationException.class); + } + + @Test + void integrationTestDefaultShareConsumerFactory() throws Exception { + final String topic = "embedded-share-test"; + final String groupId = "testGroup"; + var bootstrapServers = embeddedKafka.getBrokersAsString(); + + var producerProps = new java.util.Properties(); + producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); + producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); + producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); + + try (var producer = new KafkaProducer(producerProps)) { + producer.send(new ProducerRecord<>(topic, "key", "integration-test-value")).get(); + } + + Map adminProperties = new HashMap<>(); + adminProperties.put("bootstrap.servers", bootstrapServers); + + // For this test: force new share groups to start from the beginning of the topic. + // This is NOT the same as the usual consumer auto.offset.reset; it's a group config, + // so use AdminClient to set share.auto.offset.reset = earliest for our test group. + try (AdminClient ignored = AdminClient.create(adminProperties)) { + ConfigEntry entry = new ConfigEntry("share.auto.offset.reset", "earliest"); + AlterConfigOp op = new AlterConfigOp(entry, AlterConfigOp.OpType.SET); + + Map> configs = Map.of( + new ConfigResource(ConfigResource.Type.GROUP, "testGroup"), Arrays.asList(op)); + + try (Admin admin = AdminClient.create(adminProperties)) { + admin.incrementalAlterConfigs(configs).all().get(); + } + } + + var consumerProps = new HashMap(); + consumerProps.put("bootstrap.servers", bootstrapServers); + consumerProps.put("key.deserializer", StringDeserializer.class); + consumerProps.put("value.deserializer", StringDeserializer.class); + consumerProps.put("group.id", groupId); + consumerProps.put("share.auto.offset.reset", "earliest"); + + var factory = new DefaultShareConsumerFactory(consumerProps); + var consumer = factory.createShareConsumer(groupId, null, null); + consumer.subscribe(Collections.singletonList(topic)); + + var records = consumer.poll(Duration.ofSeconds(10)); + assertThat(records.count()) + .as("Should have received at least one record") + .isGreaterThan(0); + assertThat(records.iterator().next().value()) + .as("Record value should match") + .isEqualTo("integration-test-value"); + consumer.close(); + } + +}