This section covers the changes made from version 2.7 to version 2.8. For changes in earlier version, see [history].
Classes and interfaces related to type mapping have been moved from …support.converter
to …support.mapping
.
-
AbstractJavaTypeMapper
-
ClassMapper
-
DefaultJackson2JavaTypeMapper
-
Jackson2JavaTypeMapper
The listener container can now be configured to accept manual offset commits out of order (usually asynchronously). The container will defer the commit until the missing offset is acknowledged. See [ooo-commits] for more information.
It is now possible to specify whether the listener method is a batch listener on the method itself. This allows the same container factory to be used for both record and batch listeners.
See Batch Listeners for more information.
Batch listeners can now handle conversion exceptions.
See [batch-listener-conv-errors] for more information.
RecordFilterStrategy
, when used with batch listeners, can now filter the entire batch in one call.
See the note at the end of Batch Listeners for more information.
The @KafkaListener
annotation now has the filter
attribute, to override the container factory’s RecordFilterStrategy
for just this listener.
The @KafkaListener
annotation now has the info
attribute; this is used to populate the new listener container property listenerInfo
.
This is then used to populate a KafkaHeaders.LISTENER_INFO
header in each record which can be used in RecordInterceptor
, RecordFilterStrategy
, or the listener itself.
See [li-header] and Abstract Listener Container Properties for more information.
You can now receive a single record, given the topic, partition and offset. See [kafka-template-receive] for more information.
The legacy GenericErrorHandler
and its sub-interface hierarchies for record an batch listeners have been replaced by a new single interface CommonErrorHandler
with implementations corresponding to most legacy implementations of GenericErrorHandler
.
See [error-handlers] and [migrating-legacy-eh] for more information.
The interceptBeforeTx
container property is now true
by default.
The authorizationExceptionRetryInterval
property has been renamed to authExceptionRetryInterval
and now applies to AuthenticationException
s in addition to AuthorizationException
s previously.
Both exceptions are considered fatal and the container will stop by default, unless this property is set.
See [kafka-container] and [container-props] for more information.
The DelegatingByTopicSerializer
and DelegatingByTopicDeserializer
are now provided.
See [delegating-serialization] for more information.
The property stripPreviousExceptionHeaders
is now true
by default.
There are now several techniques to customize which headers are added to the output record.
See [dlpr-headers] for more information.
Now you can use the same factory for retryable and non-retryable topics. See [retry-topic-lcf] for more information.
There’s now a manageable global list of fatal exceptions that will make the failed record go straight to the DLT. Refer to [retry-topic-ex-classifier] to see how to manage it.
You can now use blocking and non-blocking retries in conjunction. See [retry-topic-combine-blocking] for more information.
The KafkaBackOffException thrown when using the retryable topics feature is now logged at DEBUG level. See [change-kboe-logging-level] if you need to change the logging level back to WARN or set it to any other level.
This version requires the 2.7.0 kafka-clients
.
It is also compatible with the 2.8.0 clients, since version 2.7.1; see [update-deps].
This significant new feature is added in this release. When strict ordering is not important, failed deliveries can be sent to another topic to be consumed later. A series of such retry topics can be configured, with increasing delays. See [retry-topic] for more information.
The onlyLogRecordMetadata
container property is now true
by default.
A new container property stopImmediate
is now available.
See [container-props] for more information.
Error handlers that use a BackOff
between delivery attempts (e.g. SeekToCurrentErrorHandler
and DefaultAfterRollbackProcessor
) will now exit the back off interval soon after the container is stopped, rather than delaying the stop.
See [after-rollback] and [seek-to-current] for more information.
Error handlers and after rollback processors that extend FailedRecordProcessor
can now be configured with one or more RetryListener
s to receive information about retry and recovery progress.
See See [after-rollback], [seek-to-current], and [recovering-batch-eh] for more information.
The RecordInterceptor
now has additional methods called after the listener returns (normally, or by throwing an exception).
It also has a sub-interface ConsumerAwareRecordInterceptor
.
In addition, there is now a BatchInterceptor
for batch listeners.
See [message-listener-container] for more information.
You can now validate the payload parameter of @KafkaHandler
methods (class-level listeners).
See [kafka-validation] for more information.
You can now set the rawRecordHeader
property on the MessagingMessageConverter
and BatchMessagingMessageConverter
which causes the raw ConsumerRecord
to be added to the converted Message<?>
.
This is useful, for example, if you wish to use a DeadLetterPublishingRecoverer
in a listener error handler.
See [listener-error-handlers] for more information.
You can now modify @KafkaListener
annotations during application initialization.
See [kafkalistener-attrs] for more information.
Now, if both the key and value fail deserialization, the original values are published to the DLT.
Previously, the value was populated but the key DeserializationException
remained in the headers.
There is a breaking API change, if you subclassed the recoverer and overrode the createProducerRecord
method.
In addition, the recoverer verifies that the partition selected by the destination resolver actually exists before publishing to it.
See [dead-letters] for more information.
See [transactions] for more information.
There is now a mechanism to examine a reply and fail the future exceptionally if some condition exists.
Support for sending and receiving spring-messaging
Message<?>
s has been added.
See [replying-template] for more information.
By default, the StreamsBuilderFactoryBean
is now configured to not clean up local state.
See [streams-config] for more information.
New methods createOrModifyTopics
and describeTopics
have been added.
KafkaAdmin.NewTopics
has been added to facilitate configuring multiple topics in a single bean.
See [configuring-topics] for more information.
It is now possible to add a spring-messaging
SmartMessageConverter
to the MessagingMessageConverter
, allowing content negotiation based on the contentType
header.
See [messaging-message-conversion] for more information.
See [sequencing] for more information.
A new BackOff
implementation is provided, making it more convenient to configure the max retries.
See [exp-backoff] for more information.
These new error handlers can be configured to delegate to different error handlers, depending on the exception type. See [cond-eh] for more information.
The default EOSMode
is now BETA
.
See [exactly-once] for more information.
Various error handlers (that extend FailedRecordProcessor
) and the DefaultAfterRollbackProcessor
now reset the BackOff
if recovery fails.
In addition, you can now select the BackOff
to use based on the failed record and/or exception.
See [seek-to-current], [recovering-batch-eh], [dead-letters] and [after-rollback] for more information.
You can now configure an adviceChain
in the container properties.
See [container-props] for more information.
When the container is configured to publish ListenerContainerIdleEvent
s, it now publishes a ListenerContainerNoLongerIdleEvent
when a record is received after publishing an idle event.
See [events] and [idle-containers] for more information.
When using manual partition assignment, you can now specify a wildcard for determining which partitions should be reset to the initial offset.
In addition, if the listener implements ConsumerSeekAware
, onPartitionsAssigned()
is called after the manual assignment.
(Also added in version 2.5.5).
See [manual-assignment] for more information.
Convenience methods have been added to AbstractConsumerSeekAware
to make seeking easier.
See Seek for more information.
Subclasses of FailedRecordProcessor
(e.g. SeekToCurrentErrorHandler
, DefaultAfterRollbackProcessor
, RecoveringBatchErrorHandler
) can now be configured to reset the retry state if the exception is a different type to that which occurred previously with this record.
See [seek-to-current], [after-rollback], [recovering-batch-eh] for more information.
You can now set a maximum age for producers after which they will be closed and recreated. See [transactions] for more information.
You can now update the configuration map after the DefaultKafkaProducerFactory
has been created.
This might be useful, for example, if you have to update SSL key/trust store locations after a credentials change.
See [producer-factory] for more information.
This section covers the changes made from version 2.4 to version 2.5. For changes in earlier version, see [history].
The default consumer and producer factories can now invoke a callback whenever a consumer or producer is created or closed. Implementations for native Micrometer metrics are provided. See [factory-listeners] for more information.
You can now change bootstrap server properties at runtime, enabling failover to another Kafka cluster. See [connecting] for more information.
The factory bean can now invoke a callback whenever a KafkaStreams
created or destroyed.
An Implementation for native Micrometer metrics is provided.
See [streams-micrometer] for more information.
There is now an option to to add a header which tracks delivery attempts when using certain error handlers and after rollback processors. See [delivery-header] for more information.
Default reply headers will now be populated automatically if needed when a @KafkaListener
return type is Message<?>
.
See [reply-message] for more information.
The KafkaHeaders.RECEIVED_MESSAGE_KEY
is no longer populated with a null
value when the incoming record has a null
key; the header is omitted altogether.
@KafkaListener
methods can now specify a ConsumerRecordMetadata
parameter instead of using discrete headers for metadata such as topic, partition, etc.
See [consumer-record-metadata] for more information.
The assignmentCommitOption
container property is now LATEST_ONLY_NO_TX
by default.
See [container-props] for more information.
The subBatchPerPartition
container property is now true
by default when using transactions.
See [transactions] for more information.
A new RecoveringBatchErrorHandler
is now provided.
See [recovering-batch-eh] for more information.
Static group membership is now supported. See [message-listener-container] for more information.
When incremental/cooperative rebalancing is configured, if offsets fail to commit with a non-fatal RebalanceInProgressException
, the container will attempt to re-commit the offsets for the partitions that remain assigned to this instance after the rebalance is completed.
The default error handler is now the SeekToCurrentErrorHandler
for record listeners and RecoveringBatchErrorHandler
for batch listeners.
See [error-handlers] for more information.
You can now control the level at which exceptions intentionally thrown by standard error handlers are logged. See [error-handlers] for more information.
The getAssignmentsByClientId()
method has been added, making it easier to determine which consumers in a concurrent container are assigned which partition(s).
See [container-props] for more information.
You can now suppress logging entire ConsumerRecord
s in error, debug logs etc.
See onlyLogRecordMetadata
in [container-props].
The KafkaTemplate
can now maintain micrometer timers.
See [micrometer] for more information.
The KafkaTemplate
can now be configured with ProducerConfig
properties to override those in the producer factory.
See [kafka-template] for more information.
A RoutingKafkaTemplate
has now been provided.
See [routing-template] for more information.
You can now use KafkaSendCallback
instead of ListenerFutureCallback
to get a narrower exception, making it easier to extract the failed ProducerRecord
.
See [kafka-template] for more information.
New ToStringSerializer
/StringDeserializer
s as well as an associated SerDe
are now provided.
See [string-serde] for more information.
The JsonDeserializer
now has more flexibility to determine the deserialization type.
See [serdes-type-methods] for more information.
The DelegatingSerializer
can now handle "standard" types, when the outbound record has no header.
See [delegating-serialization] for more information.
The KafkaTestUtils.consumerProps()
helper record now sets ConsumerConfig.AUTO_OFFSET_RESET_CONFIG
to earliest
by default.
See [junit] for more information.
This version requires the 2.4.0 kafka-clients
or higher and supports the new incremental rebalancing feature.
Like ConsumerRebalanceListener
, this interface now has an additional method onPartitionsLost
.
Refer to the Apache Kafka documentation for more information.
Unlike the ConsumerRebalanceListener
, The default implementation does not call onPartitionsRevoked
.
Instead, the listener container will call that method after it has called onPartitionsLost
; you should not, therefore, do the same when implementing ConsumerAwareRebalanceListener
.
See the IMPORTANT note at the end of [rebalance-listeners] for more information.
The KafkaTemplate
now supports non-transactional publishing alongside transactional.
See [tx-template-mixed] for more information.
The releaseStrategy
is now a BiConsumer
.
It is now called after a timeout (as well as when records arrive); the second parameter is true
in the case of a call after a timeout.
See [aggregating-request-reply] for more information.
The ContainerProperties
provides an authorizationExceptionRetryInterval
option to let the listener container to retry after any AuthorizationException
is thrown by the KafkaConsumer
.
See its JavaDocs and [kafka-container] for more information.
The @KafkaListener
annotation has a new property splitIterables
; default true.
When a replying listener returns an Iterable
this property controls whether the return result is sent as a single record or a record for each element is sent.
See [annotation-send-to] for more information
Batch listeners can now be configured with a BatchToRecordAdapter
; this allows, for example, the batch to be processed in a transaction while the listener gets one record at a time.
With the default implementation, a ConsumerRecordRecoverer
can be used to handle errors within the batch, without stopping the processing of the entire batch - this might be useful when using transactions.
See [transactions-batch] for more information.
The StreamsBuilderFactoryBean
accepts a new property KafkaStreamsInfrastructureCustomizer
.
This allows configuration of the builder and/or topology before the stream is created.
See [streams-spring] for more information.
This section covers the changes made from version 2.2 to version 2.3.
A new chapter [tips-n-tricks] has been added. Please submit GitHub issues and/or pull requests for additional entries in that chapter.
Starting with version 2.3.4, the missingTopicsFatal
container property is false by default.
When this is true, the application fails to start if the broker is down; many users were affected by this change; given that Kafka is a high-availability platform, we did not anticipate that starting an application with no active brokers would be a common use case.
The DefaultKafkaProducerFactory
can now be configured to create a producer per thread.
You can also provide Supplier<Serializer>
instances in the constructor as an alternative to either configured classes (which require no-arg constructors), or constructing with Serializer
instances, which are then shared between all Producers.
See [producer-factory] for more information.
The same option is available with Supplier<Deserializer>
instances in DefaultKafkaConsumerFactory
.
See [kafka-container] for more information.
Previously, error handlers received ListenerExecutionFailedException
(with the actual listener exception as the cause
) when the listener was invoked using a listener adapter (such as @KafkaListener
s).
Exceptions thrown by native GenericMessageListener
s were passed to the error handler unchanged.
Now a ListenerExecutionFailedException
is always the argument (with the actual listener exception as the cause
), which provides access to the container’s group.id
property.
Because the listener container has it’s own mechanism for committing offsets, it prefers the Kafka ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG
to be false
.
It now sets it to false automatically unless specifically set in the consumer factory or the container’s consumer property overrides.
The ackOnError
property is now false
by default.
See [seek-to-current] for more information.
It is now possible to obtain the consumer’s group.id
property in the listener method.
See [listener-group-id] for more information.
The container has a new property recordInterceptor
allowing records to be inspected or modified before invoking the listener.
A CompositeRecordInterceptor
is also provided in case you need to invoke multiple interceptors.
See [message-listener-container] for more information.
The ConsumerSeekAware
has new methods allowing you to perform seeks relative to the beginning, end, or current position and to seek to the first offset greater than or equal to a time stamp.
See Seek for more information.
A convenience class AbstractConsumerSeekAware
is now provided to simplify seeking.
See Seek for more information.
The ContainerProperties
provides an idleBetweenPolls
option to let the main loop in the listener container to sleep between KafkaConsumer.poll()
calls.
See its JavaDocs and [kafka-container] for more information.
When using AckMode.MANUAL
(or MANUAL_IMMEDIATE
) you can now cause a redelivery by calling nack
on the Acknowledgment
.
See [committing-offsets] for more information.
Listener performance can now be monitored using Micrometer Timer
s.
See [micrometer] for more information.
The containers now publish additional consumer lifecycle events relating to startup. See [events] for more information.
Transactional batch listeners can now support zombie fencing. See [transactions] for more information.
The listener container factory can now be configured with a ContainerCustomizer
to further configure each container after it has been created and configured.
See [container-factory] for more information.
The SeekToCurrentErrorHandler
now treats certain exceptions as fatal and disables retry for those, invoking the recoverer on first failure.
The SeekToCurrentErrorHandler
and SeekToCurrentBatchErrorHandler
can now be configured to apply a BackOff
(thread sleep) between delivery attempts.
Starting with version 2.3.2, recovered records' offsets will be committed when the error handler returns after recovering a failed record.
See [seek-to-current] for more information.
The DeadLetterPublishingRecoverer
, when used in conjunction with an ErrorHandlingDeserializer
, now sets the payload of the message sent to the dead-letter topic, to the original value that could not be deserialized.
Previously, it was null
and user code needed to extract the DeserializationException
from the message headers.
See [dead-letters] for more information.
A new class TopicBuilder
is provided for more convenient creation of NewTopic
@Bean
s for automatic topic provisioning.
See [configuring-topics] for more information.
You can now perform additional configuration of the StreamsBuilderFactoryBean
created by @EnableKafkaStreams
.
See Streams Configuration for more information.
A RecoveringDeserializationExceptionHandler
is now provided which allows records with deserialization errors to be recovered.
It can be used in conjunction with a DeadLetterPublishingRecoverer
to send these records to a dead-letter topic.
See [streams-deser-recovery] for more information.
The HeaderEnricher
transformer has been provided, using SpEL to generate the header values.
See [streams-header-enricher] for more information.
The MessagingTransformer
has been provided.
This allows a Kafka streams topology to interact with a spring-messaging component, such as a Spring Integration flow.
See [streams-messaging] and See [Calling a Spring Integration Flow from a KStream
] for more information.
Now all the JSON-aware components are configured by default with a Jackson ObjectMapper
produced by the JacksonUtils.enhancedObjectMapper()
.
The JsonDeserializer
now provides TypeReference
-based constructors for better handling of target generic container types.
Also a JacksonMimeTypeModule
has been introduced for serialization of org.springframework.util.MimeType
to plain string.
See its JavaDocs and [serdes] for more information.
A ByteArrayJsonMessageConverter
has been provided as well as a new super class for all Json converters, JsonMessageConverter
.
Also, a StringOrBytesSerializer
is now available; it can serialize byte[]
, Bytes
and String
values in ProducerRecord
s.
See [messaging-message-conversion] for more information.
When a reply times out, the future is completed exceptionally with a KafkaReplyTimeoutException
instead of a KafkaException
.
Also, an overloaded sendAndReceive
method is now provided that allows specifying the reply timeout on a per message basis.
Extends the ReplyingKafkaTemplate
by aggregating replies from multiple receivers.
See [aggregating-request-reply] for more information.
You can now override the producer factory’s transactionIdPrefix
on the KafkaTemplate
and KafkaTransactionManager
.
See [transaction-id-prefix] for more information.
The framework now provides a delegating Serializer
and Deserializer
, utilizing a header to enable producing and consuming records with multiple key/value types.
See [delegating-serialization] for more information.
The framework now provides a delegating RetryingDeserializer
, to retry serialization when transient errors such as network problems might occur.
See [retrying-deserialization] for more information.
The ContainerProperties
class has been moved from org.springframework.kafka.listener.config
to org.springframework.kafka.listener
.
The AckMode
enum has been moved from AbstractMessageListenerContainer
to ContainerProperties
.
The setBatchErrorHandler()
and setErrorHandler()
methods have been moved from ContainerProperties
to both AbstractMessageListenerContainer
and AbstractKafkaListenerContainerFactory
.
A new AfterRollbackProcessor
strategy is provided.
See [after-rollback] for more information.
You can now use the ConcurrentKafkaListenerContainerFactory
to create and configure any ConcurrentMessageListenerContainer
, not only those for @KafkaListener
annotations.
See [container-factory] for more information.
A new container property (missingTopicsFatal
) has been added.
See [kafka-container] for more information.
A ConsumerStoppedEvent
is now emitted when a consumer stops.
See [thread-safety] for more information.
Batch listeners can optionally receive the complete ConsumerRecords<?, ?>
object instead of a List<ConsumerRecord<?, ?>
.
See Batch Listeners for more information.
The DefaultAfterRollbackProcessor
and SeekToCurrentErrorHandler
can now recover (skip) records that keep failing, and, by default, does so after 10 failures.
They can be configured to publish failed records to a dead-letter topic.
Starting with version 2.2.4, the consumer’s group ID can be used while selecting the dead letter topic name.
See [after-rollback], [seek-to-current], and [dead-letters] for more information.
The ConsumerStoppingEvent
has been added.
See [events] for more information.
The SeekToCurrentErrorHandler
can now be configured to commit the offset of a recovered record when the container is configured with AckMode.MANUAL_IMMEDIATE
(since 2.2.4).
See [seek-to-current] for more information.
You can now override the concurrency
and autoStartup
properties of the listener container factory by setting properties on the annotation.
You can now add configuration to determine which headers (if any) are copied to a reply message.
See [kafka-listener-annotation] for more information.
You can now use @KafkaListener
as a meta-annotation on your own annotations.
See [kafka-listener-meta] for more information.
It is now easier to configure a Validator
for @Payload
validation.
See [kafka-validation] for more information.
You can now specify kafka consumer properties directly on the annotation; these will override any properties with the same name defined in the consumer factory (since version 2.2.4). See [annotation-properties] for more information.
Headers of type MimeType
and MediaType
are now mapped as simple strings in the RecordHeader
value.
Previously, they were mapped as JSON and only MimeType
was decoded.
MediaType
could not be decoded.
They are now simple strings for interoperability.
Also, the DefaultKafkaHeaderMapper
has a new addToStringClasses
method, allowing the specification of types that should be mapped by using toString()
instead of JSON.
See [headers] for more information.
The KafkaEmbedded
class and its KafkaRule
interface have been deprecated in favor of the EmbeddedKafkaBroker
and its JUnit 4 EmbeddedKafkaRule
wrapper.
The @EmbeddedKafka
annotation now populates an EmbeddedKafkaBroker
bean instead of the deprecated KafkaEmbedded
.
This change allows the use of @EmbeddedKafka
in JUnit 5 tests.
The @EmbeddedKafka
annotation now has the attribute ports
to specify the port that populates the EmbeddedKafkaBroker
.
See [testing] for more information.
You can now provide type mapping information by using producer and consumer properties.
New constructors are available on the deserializer to allow overriding the type header information with the supplied target type.
The JsonDeserializer
now removes any type information headers by default.
You can now configure the JsonDeserializer
to ignore type information headers by using a Kafka property (since 2.2.3).
See [serdes] for more information.
The streams configuration bean must now be a KafkaStreamsConfiguration
object instead of a StreamsConfig
object.
The StreamsBuilderFactoryBean
has been moved from package …core
to …config
.
The KafkaStreamBrancher
has been introduced for better end-user experience when conditional branches are built on top of KStream
instance.
See [streams-kafka-streams] and [streams-config] for more information.
When a transaction is started by the listener container, the transactional.id
is now the transactionIdPrefix
appended with <group.id>.<topic>.<partition>
.
This change allows proper fencing of zombies, as described here.
This version requires the 1.0.0 kafka-clients
or higher.
The 1.1.x client is supported natively in version 2.2.
The StringJsonMessageConverter
and JsonSerializer
now add type information in Headers
, letting the converter and JsonDeserializer
create specific types on reception, based on the message itself rather than a fixed configured type.
See [serdes] for more information.
Container error handlers are now provided for both record and batch listeners that treat any exceptions thrown by the listener as fatal/ They stop the container. See [annotation-error-handling] for more information.
The listener containers now have pause()
and resume()
methods (since version 2.1.3).
See [pause-resume] for more information.
Starting with version 2.1.3, you can configure stateful retry. See Stateful Retry for more information.
Starting with version 2.1.1, you can now set the client.id
prefix on @KafkaListener
.
Previously, to customize the client ID, you needed a separate consumer factory (and container factory) per listener.
The prefix is suffixed with -n
to provide unique client IDs when you use concurrency.
By default, logging of topic offset commits is performed with the DEBUG
logging level.
Starting with version 2.1.2, a new property in ContainerProperties
called commitLogLevel
lets you specify the log level for these messages.
See [kafka-container] for more information.
Starting with version 2.1.3, you can designate one of the @KafkaHandler
annotations on a class-level @KafkaListener
as the default.
See [class-level-kafkalistener] for more information.
Starting with version 2.1.3, a subclass of KafkaTemplate
is provided to support request/reply semantics.
See [replying-template] for more information.
Version 2.1.3 introduced the ChainedKafkaTransactionManager
.
(It is now deprecated).
See the 2.0 to 2.1 Migration guide.
The Spring for Apache Kafka project now requires Spring Framework 5.0 and Java 8.
You can now annotate @KafkaListener
methods (and classes and @KafkaHandler
methods) with @SendTo
.
If the method returns a result, it is forwarded to the specified topic.
See [annotation-send-to] for more information.
Message listeners can now be aware of the Consumer
object.
See Message Listeners for more information.
Rebalance listeners can now access the Consumer
object during rebalance notifications.
See [rebalance-listeners] for more information.
The 0.11.0.0 client library added support for transactions.
The KafkaTransactionManager
and other support for transactions have been added.
See [transactions] for more information.
The 0.11.0.0 client library added support for message headers.
These can now be mapped to and from spring-messaging
MessageHeaders
.
See [headers] for more information.
The 0.11.0.0 client library provides an AdminClient
, which you can use to create topics.
The KafkaAdmin
uses this client to automatically add topics defined as @Bean
instances.
KafkaTemplate
now supports an API to add records with timestamps.
New KafkaHeaders
have been introduced regarding timestamp
support.
Also, new KafkaConditions.timestamp()
and KafkaMatchers.hasTimestamp()
testing utilities have been added.
See [kafka-template], [kafka-listener-annotation], and [testing] for more details.
You can now configure a KafkaListenerErrorHandler
to handle exceptions.
See [annotation-error-handling] for more information.
By default, the @KafkaListener
id
property is now used as the group.id
property, overriding the property configured in the consumer factory (if present).
Further, you can explicitly configure the groupId
on the annotation.
Previously, you would have needed a separate container factory (and consumer factory) to use different group.id
values for listeners.
To restore the previous behavior of using the factory configured group.id
, set the idIsGroup
property on the annotation to false
.
For convenience, a test class-level @EmbeddedKafka
annotation is provided, to register KafkaEmbedded
as a bean.
See [testing] for more information.
Support for configuring Kerberos is now provided. See [kerberos] for more information.
Listeners can be configured to receive the entire batch of messages returned by the consumer.poll()
operation, rather than one at a time.
When explicitly assigning partitions, you can now configure the initial offset relative to the current position for the consumer group, rather than absolute or relative to the current end.
You can now seek the position of each topic or partition. You can use this to set the initial position during initialization when group management is in use and Kafka assigns the partitions. You can also seek when an idle container is detected or at any arbitrary point in your application’s execution. See Seek for more information.