Skip to content

Commit 86c9aa1

Browse files
authored
GH-2134: Add filter to @KafkaListener
Resolves #2134 Also clean up parsing of `errorHandler` and `contentTypeConverter`. **cherry-pick to 2.8.x** * Polishing per review comments. * Add info property to KafkaListener. * Rebase; add since.
1 parent a7dbb78 commit 86c9aa1

17 files changed

+304
-65
lines changed

Diff for: spring-kafka-docs/src/main/asciidoc/kafka.adoc

+43
Original file line numberDiff line numberDiff line change
@@ -2315,6 +2315,18 @@ In addition, a `FilteringBatchMessageListenerAdapter` is provided, for when you
23152315

23162316
IMPORTANT: The `FilteringBatchMessageListenerAdapter` is ignored if your `@KafkaListener` receives a `ConsumerRecords<?, ?>` instead of `List<ConsumerRecord<?, ?>>`, because `ConsumerRecords` is immutable.
23172317

2318+
Starting with version 2.8.4, you can override the listener container factory's default `RecordFilterStrategy` by using the `filter` property on the listener annotations.
2319+
2320+
====
2321+
[source, java]
2322+
----
2323+
@KafkaListener(id = "filtered", topics = "topic", filter = "differentFilter")
2324+
public void listen(Thing thing) {
2325+
...
2326+
}
2327+
----
2328+
====
2329+
23182330
[[retrying-deliveries]]
23192331
===== Retrying Deliveries
23202332

@@ -2594,6 +2606,7 @@ Mutually exclusive; at least one must be provided; enforced by `ContainerPropert
25942606
|See <<transactions>>.
25952607
|===
25962608

2609+
[[alc-props]]
25972610
.`AbstractListenerContainer` Properties
25982611
[cols="9,10,16", options="header"]
25992612
|===
@@ -2651,6 +2664,12 @@ See <<error-handlers>>.
26512664
|See desc.
26522665
|The bean name for user-configured containers or the `id` attribute of `@KafkaListener` s.
26532666

2667+
|[[listenerInfo]]<<listenerInfo,`listenerInfo`>>
2668+
|null
2669+
|A value to populate in the `KafkaHeaders.LISTENER_INFO` header.
2670+
With `@KafkaListener`, this value is obtained from the `info` attribute.
2671+
This header can be used in various places, such as a `RecordInterceptor`, `RecordFilterStrategy` and in the listener code itself.
2672+
26542673
|[[pauseRequested]]<<pauseRequested,`pauseRequested`>>
26552674
|(read only)
26562675
|True if a consumer pause has been requested.
@@ -5544,6 +5563,30 @@ It is disabled by default to avoid the (small) overhead of looking up the state
55445563

55455564
The `DefaultErrorHandler` and `DefaultAfterRollbackProcessor` support this feature.
55465565

5566+
[[li-header]]
5567+
===== Listener Info Header
5568+
5569+
In some cases, it is useful to be able to know which container a listener is running in.
5570+
5571+
Starting with version 2.8.4, you can now set the `listenerInfo` property on the listener container, or set the `info` attribute on the `@KafkaListener` annotation.
5572+
Then, the container will add this in the `KafkaListener.LISTENER_INFO` header to all incoming messages; it can then be used in record interceptors, filters, etc., or in the listener itself.
5573+
5574+
====
5575+
[source, java]
5576+
----
5577+
@KafkaListener(id = "something", topic = "topic", filter = "someFilter",
5578+
info = "this is the something listener")
5579+
public void listen2(@Payload Thing thing,
5580+
@Header(KafkaHeaders.LISTENER_INFO) String listenerInfo) {
5581+
...
5582+
}
5583+
----
5584+
====
5585+
5586+
When used in a `RecordInterceptor` or `RecordFilterStrategy` implementation, the header is in the consumer record as a byte array, converted using the `KafkaListenerAnnotationBeanPostProcessor` 's `charSet` property.
5587+
5588+
The header mappers also convert to `String` when creating `MessageHeaders` from the consumer record and never map this header on an outbound record.
5589+
55475590
[[dead-letters]]
55485591
===== Publishing Dead-letter Records
55495592

Diff for: spring-kafka-docs/src/main/asciidoc/whats-new.adoc

+6
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,12 @@ See <<batch-listener-conv-errors>> for more information.
4444
`RecordFilterStrategy`, when used with batch listeners, can now filter the entire batch in one call.
4545
See the note at the end of <<batch-listeners>> for more information.
4646

47+
The `@KafkaListener` annotation now has the `filter` attribute, to override the container factory's `RecordFilterStrategy` for just this listener.
48+
49+
The `@KafkaListener` annotation now the "info" attribute; this is used to populate the new listener container property `listenerInfo`.
50+
This is then used to populate a `KafkaHeaders.LISTENER_INFO` header in each record which can be used in `RecordInterceptor`, `RecordFilterStrategy`, or the listener itself.
51+
See <<li-header>> and <<alc-props,Abstract Listener Container Properties>> for more information.
52+
4753
[[x28-template]]
4854
==== `KafkaTemplate` Changes
4955

Diff for: spring-kafka/src/main/java/org/springframework/kafka/annotation/KafkaListener.java

+28-1
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2016-2021 the original author or authors.
2+
* Copyright 2016-2022 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -297,4 +297,31 @@
297297
*/
298298
String batch() default "";
299299

300+
/**
301+
* Set an {@link org.springframework.kafka.listener.adapter.RecordFilterStrategy} bean
302+
* name to override the strategy configured on the container factory. If a SpEL
303+
* expression is provided ({@code #{...}}), the expression can either evaluate to a
304+
* {@link org.springframework.kafka.listener.adapter.RecordFilterStrategy} instance or
305+
* a bean name.
306+
* @return the error handler.
307+
* @since 2.8.4
308+
*/
309+
String filter() default "";
310+
311+
/**
312+
* Static information that will be added as a header with key
313+
* {@link org.springframework.kafka.support.KafkaHeaders#LISTENER_INFO}. This can be used, for example, in a
314+
* {@link org.springframework.kafka.listener.RecordInterceptor}, {@link RecordFiorg.springframework.kafka.listener.adapter.RecordFilterStrategylterStrategy} or the listener itself, for
315+
* any purposes.
316+
* <p>
317+
* SpEL {@code #{...}} and property place holders {@code ${...}} are supported, but it
318+
* must resolve to a String or `byte[]`.
319+
* <p>
320+
* This header will be stripped out if an outbound record is created with the headers
321+
* from an input record.
322+
* @return the info.
323+
* @since 2.8.4
324+
*/
325+
String info() default "";
326+
300327
}

Diff for: spring-kafka/src/main/java/org/springframework/kafka/annotation/KafkaListenerAnnotationBeanPostProcessor.java

+58-35
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2014-2021 the original author or authors.
2+
* Copyright 2014-2022 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -85,6 +85,7 @@
8585
import org.springframework.kafka.config.MultiMethodKafkaListenerEndpoint;
8686
import org.springframework.kafka.listener.ContainerGroupSequencer;
8787
import org.springframework.kafka.listener.KafkaListenerErrorHandler;
88+
import org.springframework.kafka.listener.adapter.RecordFilterStrategy;
8889
import org.springframework.kafka.retrytopic.RetryTopicBootstrapper;
8990
import org.springframework.kafka.retrytopic.RetryTopicConfiguration;
9091
import org.springframework.kafka.retrytopic.RetryTopicConfigurer;
@@ -143,6 +144,12 @@
143144
public class KafkaListenerAnnotationBeanPostProcessor<K, V>
144145
implements BeanPostProcessor, Ordered, ApplicationContextAware, InitializingBean, SmartInitializingSingleton {
145146

147+
private static final String THE_LEFT = "The [";
148+
149+
private static final String RESOLVED_TO_LEFT = "Resolved to [";
150+
151+
private static final String RIGHT_FOR_LEFT = "] for [";
152+
146153
private static final String GENERATED_ID_PREFIX = "org.springframework.kafka.KafkaListenerEndpointContainer#";
147154

148155
/**
@@ -251,8 +258,8 @@ public void setBeanFactory(BeanFactory beanFactory) {
251258
}
252259

253260
/**
254-
* Set a charset to use when converting byte[] to String in method arguments.
255-
* Default UTF-8.
261+
* Set a charset to use when converting byte[] to String in method arguments and other
262+
* String/byte[] conversions. Default UTF-8.
256263
* @param charset the charset.
257264
* @since 2.2
258265
*/
@@ -489,7 +496,7 @@ private boolean processMainAndRetryListeners(KafkaListener kafkaListener, Object
489496
}
490497

491498
RetryTopicConfigurer.EndpointProcessor endpointProcessor = endpointToProcess ->
492-
this.processKafkaListenerAnnotationForRetryTopic(endpointToProcess, kafkaListener, bean, topics, tps);
499+
this.processKafkaListenerAnnotation(endpointToProcess, kafkaListener, bean, topics, tps);
493500

494501
KafkaListenerContainerFactory<?> factory =
495502
resolveContainerFactory(kafkaListener, resolve(kafkaListener.containerFactory()), beanName);
@@ -560,24 +567,16 @@ private Method checkProxy(Method methodArg, Object bean) {
560567
protected void processListener(MethodKafkaListenerEndpoint<?, ?> endpoint, KafkaListener kafkaListener,
561568
Object bean, String beanName, String[] topics, TopicPartitionOffset[] tps) {
562569

563-
processKafkaListenerAnnotationBeforeRegistration(endpoint, kafkaListener, bean, topics, tps);
570+
processKafkaListenerAnnotation(endpoint, kafkaListener, bean, topics, tps);
564571

565572
String containerFactory = resolve(kafkaListener.containerFactory());
566-
KafkaListenerContainerFactory<?> listenerContainerFactory = resolveContainerFactory(kafkaListener, containerFactory, beanName);
573+
KafkaListenerContainerFactory<?> listenerContainerFactory = resolveContainerFactory(kafkaListener,
574+
containerFactory, beanName);
567575

568576
this.registrar.registerEndpoint(endpoint, listenerContainerFactory);
569-
570-
processKafkaListenerEndpointAfterRegistration(endpoint, kafkaListener);
571-
}
572-
573-
private void processKafkaListenerAnnotationForRetryTopic(MethodKafkaListenerEndpoint<?, ?> endpoint,
574-
KafkaListener kafkaListener, Object bean, String[] topics, TopicPartitionOffset[] tps) {
575-
576-
processKafkaListenerAnnotationBeforeRegistration(endpoint, kafkaListener, bean, topics, tps);
577-
processKafkaListenerEndpointAfterRegistration(endpoint, kafkaListener);
578577
}
579578

580-
private void processKafkaListenerAnnotationBeforeRegistration(MethodKafkaListenerEndpoint<?, ?> endpoint,
579+
private void processKafkaListenerAnnotation(MethodKafkaListenerEndpoint<?, ?> endpoint,
581580
KafkaListener kafkaListener, Object bean, String[] topics, TopicPartitionOffset[] tps) {
582581

583582
endpoint.setBean(bean);
@@ -588,6 +587,7 @@ private void processKafkaListenerAnnotationBeforeRegistration(MethodKafkaListene
588587
endpoint.setTopics(topics);
589588
endpoint.setTopicPattern(resolvePattern(kafkaListener));
590589
endpoint.setClientIdPrefix(resolveExpressionAsString(kafkaListener.clientIdPrefix(), "clientIdPrefix"));
590+
endpoint.setListenerInfo(resolveExpressionAsBytes(kafkaListener.info(), "info"));
591591
String group = kafkaListener.containerGroup();
592592
if (StringUtils.hasText(group)) {
593593
Object resolvedGroup = resolveExpression(group);
@@ -608,20 +608,10 @@ private void processKafkaListenerAnnotationBeforeRegistration(MethodKafkaListene
608608
if (StringUtils.hasText(kafkaListener.batch())) {
609609
endpoint.setBatchListener(Boolean.parseBoolean(kafkaListener.batch()));
610610
}
611-
}
612-
613-
private void processKafkaListenerEndpointAfterRegistration(MethodKafkaListenerEndpoint<?, ?> endpoint,
614-
KafkaListener kafkaListener) {
615-
616611
endpoint.setBeanFactory(this.beanFactory);
617-
String errorHandlerBeanName = resolveExpressionAsString(kafkaListener.errorHandler(), "errorHandler");
618-
if (StringUtils.hasText(errorHandlerBeanName)) {
619-
resolveErrorHandler(endpoint, kafkaListener);
620-
}
621-
String converterBeanName = resolveExpressionAsString(kafkaListener.contentTypeConverter(), "contentTypeConverter");
622-
if (StringUtils.hasText(converterBeanName)) {
623-
resolveContentTypeConverter(endpoint, kafkaListener);
624-
}
612+
resolveErrorHandler(endpoint, kafkaListener);
613+
resolveContentTypeConverter(endpoint, kafkaListener);
614+
resolveFilter(endpoint, kafkaListener);
625615
}
626616

627617
private void resolveErrorHandler(MethodKafkaListenerEndpoint<?, ?> endpoint, KafkaListener kafkaListener) {
@@ -653,9 +643,24 @@ private void resolveContentTypeConverter(MethodKafkaListenerEndpoint<?, ?> endpo
653643
}
654644
}
655645

646+
private void resolveFilter(MethodKafkaListenerEndpoint<?, ?> endpoint, KafkaListener kafkaListener) {
647+
Object filter = resolveExpression(kafkaListener.filter());
648+
if (filter instanceof RecordFilterStrategy) {
649+
endpoint.setRecordFilterStrategy((RecordFilterStrategy) filter);
650+
}
651+
else {
652+
String filterBeanName = resolveExpressionAsString(kafkaListener.filter(), "filter");
653+
if (StringUtils.hasText(filterBeanName)) {
654+
endpoint.setRecordFilterStrategy(
655+
this.beanFactory.getBean(filterBeanName, RecordFilterStrategy.class));
656+
}
657+
}
658+
}
659+
656660
@Nullable
657661
private KafkaListenerContainerFactory<?> resolveContainerFactory(KafkaListener kafkaListener,
658662
Object factoryTarget, String beanName) {
663+
659664
String containerFactory = kafkaListener.containerFactory();
660665
if (!StringUtils.hasText(containerFactory)) {
661666
return null;
@@ -934,8 +939,26 @@ private String resolveExpressionAsString(String value, String attribute) {
934939
return (String) resolved;
935940
}
936941
else if (resolved != null) {
937-
throw new IllegalStateException("The [" + attribute + "] must resolve to a String. "
938-
+ "Resolved to [" + resolved.getClass() + "] for [" + value + "]");
942+
throw new IllegalStateException(THE_LEFT + attribute + "] must resolve to a String. "
943+
+ RESOLVED_TO_LEFT + resolved.getClass() + RIGHT_FOR_LEFT + value + "]");
944+
}
945+
return null;
946+
}
947+
948+
@Nullable
949+
private byte[] resolveExpressionAsBytes(String value, String attribute) {
950+
Object resolved = resolveExpression(value);
951+
if (resolved instanceof String) {
952+
if (StringUtils.hasText((CharSequence) resolved)) {
953+
return ((String) resolved).getBytes(this.charset);
954+
}
955+
}
956+
else if (resolved instanceof byte[]) {
957+
return (byte[]) resolved;
958+
}
959+
else if (resolved != null) {
960+
throw new IllegalStateException(THE_LEFT + attribute + "] must resolve to a String or byte[]. "
961+
+ RESOLVED_TO_LEFT + resolved.getClass() + RIGHT_FOR_LEFT + value + "]");
939962
}
940963
return null;
941964
}
@@ -951,8 +974,8 @@ else if (resolved instanceof Number) {
951974
}
952975
else if (resolved != null) {
953976
throw new IllegalStateException(
954-
"The [" + attribute + "] must resolve to an Number or a String that can be parsed as an Integer. "
955-
+ "Resolved to [" + resolved.getClass() + "] for [" + value + "]");
977+
THE_LEFT + attribute + "] must resolve to an Number or a String that can be parsed as an Integer. "
978+
+ RESOLVED_TO_LEFT + resolved.getClass() + RIGHT_FOR_LEFT + value + "]");
956979
}
957980
return result;
958981
}
@@ -968,8 +991,8 @@ else if (resolved instanceof String) {
968991
}
969992
else if (resolved != null) {
970993
throw new IllegalStateException(
971-
"The [" + attribute + "] must resolve to a Boolean or a String that can be parsed as a Boolean. "
972-
+ "Resolved to [" + resolved.getClass() + "] for [" + value + "]");
994+
THE_LEFT + attribute + "] must resolve to a Boolean or a String that can be parsed as a Boolean. "
995+
+ RESOLVED_TO_LEFT + resolved.getClass() + RIGHT_FOR_LEFT + value + "]");
973996
}
974997
return result;
975998
}

Diff for: spring-kafka/src/main/java/org/springframework/kafka/config/AbstractKafkaListenerContainerFactory.java

+6-4
Original file line numberDiff line numberDiff line change
@@ -369,10 +369,12 @@ public C createListenerContainer(KafkaListenerEndpoint endpoint) {
369369
return instance;
370370
}
371371

372-
@SuppressWarnings("deprecation")
373372
private void configureEndpoint(AbstractKafkaListenerEndpoint<K, V> aklEndpoint) {
373+
if (aklEndpoint.getRecordFilterStrategy() == null) {
374+
JavaUtils.INSTANCE
375+
.acceptIfNotNull(this.recordFilterStrategy, aklEndpoint::setRecordFilterStrategy);
376+
}
374377
JavaUtils.INSTANCE
375-
.acceptIfNotNull(this.recordFilterStrategy, aklEndpoint::setRecordFilterStrategy)
376378
.acceptIfNotNull(this.ackDiscarded, aklEndpoint::setAckDiscarded)
377379
.acceptIfNotNull(this.statefulRetry, aklEndpoint::setStatefulRetry)
378380
.acceptIfNotNull(this.replyTemplate, aklEndpoint::setReplyTemplate)
@@ -381,7 +383,6 @@ private void configureEndpoint(AbstractKafkaListenerEndpoint<K, V> aklEndpoint)
381383
if (aklEndpoint.getBatchListener() == null) {
382384
JavaUtils.INSTANCE
383385
.acceptIfNotNull(this.batchListener, aklEndpoint::setBatchListener);
384-
385386
}
386387
}
387388

@@ -431,7 +432,8 @@ else if (this.autoStartup != null) {
431432
.acceptIfHasText(endpoint.getGroupId(), instance.getContainerProperties()::setGroupId)
432433
.acceptIfHasText(endpoint.getClientIdPrefix(), instance.getContainerProperties()::setClientId)
433434
.acceptIfNotNull(endpoint.getConsumerProperties(),
434-
instance.getContainerProperties()::setKafkaConsumerProperties);
435+
instance.getContainerProperties()::setKafkaConsumerProperties)
436+
.acceptIfNotNull(endpoint.getListenerInfo(), instance::setListenerInfo);
435437
}
436438

437439
private void customizeContainer(C instance) {

Diff for: spring-kafka/src/main/java/org/springframework/kafka/config/AbstractKafkaListenerEndpoint.java

+18
Original file line numberDiff line numberDiff line change
@@ -116,6 +116,8 @@ public abstract class AbstractKafkaListenerEndpoint<K, V>
116116

117117
private BatchToRecordAdapter<K, V> batchToRecordAdapter;
118118

119+
private byte[] listenerInfo;
120+
119121
@Override
120122
public void setBeanFactory(BeanFactory beanFactory) throws BeansException {
121123
this.beanFactory = beanFactory;
@@ -432,6 +434,21 @@ public void setSplitIterables(boolean splitIterables) {
432434
this.splitIterables = splitIterables;
433435
}
434436

437+
@Override
438+
@Nullable
439+
public byte[] getListenerInfo() {
440+
return this.listenerInfo; // NOSONAR
441+
}
442+
443+
/**
444+
* Set the listener info to insert in the record header.
445+
* @param listenerInfo the info.
446+
* @since 2.8.4
447+
*/
448+
public void setListenerInfo(@Nullable byte[] listenerInfo) { // NOSONAR
449+
this.listenerInfo = listenerInfo; // NOSONAR
450+
}
451+
435452
@Nullable
436453
protected BatchToRecordAdapter<K, V> getBatchToRecordAdapter() {
437454
return this.batchToRecordAdapter;
@@ -446,6 +463,7 @@ public void setBatchToRecordAdapter(BatchToRecordAdapter<K, V> batchToRecordAdap
446463
this.batchToRecordAdapter = batchToRecordAdapter;
447464
}
448465

466+
449467
@Override
450468
public void afterPropertiesSet() {
451469
boolean topicsEmpty = getTopics().isEmpty();

0 commit comments

Comments
 (0)