Skip to content

Commit 2fcb082

Browse files
garyrussellartembilan
authored andcommitted
GH-2321: Support Inbound Header Mapping Matchers
Resolves #2321 **cherry-pick to 2.9.x, 2.8.x**
1 parent 2c5c88c commit 2fcb082

File tree

6 files changed

+198
-34
lines changed

6 files changed

+198
-34
lines changed

Diff for: spring-kafka-docs/src/main/asciidoc/kafka.adoc

+31
Original file line numberDiff line numberDiff line change
@@ -4762,6 +4762,8 @@ public interface KafkaHeaderMapper {
47624762
----
47634763
====
47644764

4765+
The `SimpleKafkaHeaderMapper` maps raw headers as `byte[]`, with configuration options for conversion to `String` values.
4766+
47654767
The `DefaultKafkaHeaderMapper` maps the key to the `MessageHeaders` header name and, in order to support rich header types for outbound messages, JSON conversion is performed.
47664768
A "`special`" header (with a key of `spring_json_header_types`) contains a JSON map of `<key>:<type>`.
47674769
This header is used on the inbound side to provide appropriate conversion of each header value to the original type.
@@ -4850,6 +4852,35 @@ public void testSpecificStringConvert() {
48504852
----
48514853
====
48524854

4855+
Both header mappers map all inbound headers, by default.
4856+
Starting with version 2.8.8, the patterns, can also applied to inbound mapping.
4857+
To create a mapper for inbound mapping, use one of the static methods on the respective mapper:
4858+
4859+
====
4860+
[source, java]
4861+
----
4862+
public static DefaultKafkaHeaderMapper forInboundOnlyWithMatchers(String... patterns) {
4863+
}
4864+
4865+
public static DefaultKafkaHeaderMapper forInboundOnlyWithMatchers(ObjectMapper objectMapper, String... patterns) {
4866+
}
4867+
4868+
public static SimpleKafkaHeaderMapper forInboundOnlyWithMatchers(String... patterns) {
4869+
}
4870+
----
4871+
====
4872+
4873+
For example:
4874+
4875+
====
4876+
[source, java]
4877+
----
4878+
DefaultKafkaHeaderMapper inboundMapper = DefaultKafkaHeaderMapper.forInboundOnlyWithMatchers("!abc*", "*");
4879+
----
4880+
====
4881+
4882+
This will exclude all headers beginning with `abc` and include all others.
4883+
48534884
By default, the `DefaultKafkaHeaderMapper` is used in the `MessagingMessageConverter` and `BatchMessagingMessageConverter`, as long as Jackson is on the class path.
48544885

48554886
With the batch converter, the converted headers are available in the `KafkaHeaders.BATCH_CONVERTED_HEADERS` as a `List<Map<String, Object>>` where the map in a position of the list corresponds to the data position in the payload.

Diff for: spring-kafka/src/main/java/org/springframework/kafka/support/AbstractKafkaHeaderMapper.java

+62-19
Original file line numberDiff line numberDiff line change
@@ -59,31 +59,53 @@ public abstract class AbstractKafkaHeaderMapper implements KafkaHeaderMapper {
5959
this.rawMappedHeaders.put(KafkaHeaders.LISTENER_INFO, true);
6060
}
6161

62+
private final boolean outbound;
63+
6264
private boolean mapAllStringsOut;
6365

6466
private Charset charset = StandardCharsets.UTF_8;
6567

68+
/**
69+
* Construct a mapper that will match the supplied patterns (outbound) and all headers
70+
* (inbound). For outbound mapping, certain internal framework headers are never
71+
* mapped.
72+
* @param patterns the patterns.
73+
*/
6674
public AbstractKafkaHeaderMapper(String... patterns) {
75+
this(true, patterns);
76+
}
77+
78+
/**
79+
* Construct a mapper that will match the supplied patterns (outbound) and all headers
80+
* (inbound). For outbound mapping, certain internal framework headers are never
81+
* mapped.
82+
* @param outbound true for an outbound mapper.
83+
* @param patterns the patterns.
84+
*/
85+
protected AbstractKafkaHeaderMapper(boolean outbound, String... patterns) {
6786
Assert.notNull(patterns, "'patterns' must not be null");
68-
this.matchers.add(new NeverMatchHeaderMatcher(
69-
KafkaHeaders.ACKNOWLEDGMENT,
70-
KafkaHeaders.CONSUMER,
71-
KafkaHeaders.KEY,
72-
KafkaHeaders.OFFSET,
73-
KafkaHeaders.PARTITION,
74-
KafkaHeaders.RAW_DATA,
75-
KafkaHeaders.RECEIVED_KEY,
76-
KafkaHeaders.RECEIVED_PARTITION,
77-
KafkaHeaders.RECEIVED_TIMESTAMP,
78-
KafkaHeaders.RECEIVED_TOPIC,
79-
KafkaHeaders.TIMESTAMP,
80-
KafkaHeaders.TIMESTAMP_TYPE,
81-
KafkaHeaders.BATCH_CONVERTED_HEADERS,
82-
KafkaHeaders.NATIVE_HEADERS,
83-
KafkaHeaders.TOPIC,
84-
KafkaHeaders.DELIVERY_ATTEMPT,
85-
KafkaHeaders.LISTENER_INFO,
86-
KafkaHeaders.GROUP_ID));
87+
this.outbound = outbound;
88+
if (outbound) {
89+
this.matchers.add(new NeverMatchHeaderMatcher(
90+
KafkaHeaders.ACKNOWLEDGMENT,
91+
KafkaHeaders.CONSUMER,
92+
KafkaHeaders.KEY,
93+
KafkaHeaders.OFFSET,
94+
KafkaHeaders.PARTITION,
95+
KafkaHeaders.RAW_DATA,
96+
KafkaHeaders.RECEIVED_KEY,
97+
KafkaHeaders.RECEIVED_PARTITION,
98+
KafkaHeaders.RECEIVED_TIMESTAMP,
99+
KafkaHeaders.RECEIVED_TOPIC,
100+
KafkaHeaders.TIMESTAMP,
101+
KafkaHeaders.TIMESTAMP_TYPE,
102+
KafkaHeaders.BATCH_CONVERTED_HEADERS,
103+
KafkaHeaders.NATIVE_HEADERS,
104+
KafkaHeaders.TOPIC,
105+
KafkaHeaders.DELIVERY_ATTEMPT,
106+
KafkaHeaders.LISTENER_INFO,
107+
KafkaHeaders.GROUP_ID));
108+
}
87109
for (String pattern : patterns) {
88110
this.matchers.add(new SimplePatternBasedHeaderMatcher(pattern));
89111
}
@@ -168,6 +190,27 @@ protected boolean matches(String header, Object value) {
168190
}
169191

170192
protected boolean matches(String header) {
193+
Assert.state(this.outbound, "This mapper cannot be used for outbound mapping");
194+
return doesMatch(header);
195+
}
196+
197+
/**
198+
* Matches header names for inbound mapping when configured as an inbound mapper.
199+
* @param header the header name.
200+
* @return true if it can be mapped.
201+
* @since 2.8.8
202+
*/
203+
protected boolean matchesForInbound(String header) {
204+
if (this.outbound) {
205+
return true;
206+
}
207+
if (this.matchers.size() == 0) {
208+
return true;
209+
}
210+
return doesMatch(header);
211+
}
212+
213+
private boolean doesMatch(String header) {
171214
for (HeaderMatcher matcher : this.matchers) {
172215
if (matcher.matchHeader(header)) {
173216
return !matcher.isNegated();

Diff for: spring-kafka/src/main/java/org/springframework/kafka/support/DefaultKafkaHeaderMapper.java

+35-9
Original file line numberDiff line numberDiff line change
@@ -158,14 +158,39 @@ public DefaultKafkaHeaderMapper(String... patterns) {
158158
* @see org.springframework.util.PatternMatchUtils#simpleMatch(String, String)
159159
*/
160160
public DefaultKafkaHeaderMapper(ObjectMapper objectMapper, String... patterns) {
161-
super(patterns);
161+
this(true, objectMapper, patterns);
162+
}
163+
164+
private DefaultKafkaHeaderMapper(boolean outbound, ObjectMapper objectMapper, String... patterns) {
165+
super(outbound, patterns);
162166
Assert.notNull(objectMapper, "'objectMapper' must not be null");
163167
Assert.noNullElements(patterns, "'patterns' must not have null elements");
164168
this.objectMapper = objectMapper;
165169
this.objectMapper
166170
.registerModule(new SimpleModule().addDeserializer(MimeType.class, new MimeTypeJsonDeserializer()));
167171
}
168172

173+
/**
174+
* Create an instance for inbound mapping only with pattern matching.
175+
* @param patterns the patterns to match.
176+
* @return the header mapper.
177+
* @since 2.8.8
178+
*/
179+
public static DefaultKafkaHeaderMapper forInboundOnlyWithMatchers(String... patterns) {
180+
return new DefaultKafkaHeaderMapper(false, JacksonUtils.enhancedObjectMapper(), patterns);
181+
}
182+
183+
/**
184+
* Create an instance for inbound mapping only with pattern matching.
185+
* @param objectMapper the object mapper.
186+
* @param patterns the patterns to match.
187+
* @return the header mapper.
188+
* @since 2.8.8
189+
*/
190+
public static DefaultKafkaHeaderMapper forInboundOnlyWithMatchers(ObjectMapper objectMapper, String... patterns) {
191+
return new DefaultKafkaHeaderMapper(false, objectMapper, patterns);
192+
}
193+
169194
/**
170195
* Return the object mapper.
171196
* @return the mapper.
@@ -288,19 +313,20 @@ public void fromHeaders(MessageHeaders headers, Headers target) {
288313
public void toHeaders(Headers source, final Map<String, Object> headers) {
289314
final Map<String, String> jsonTypes = decodeJsonTypes(source);
290315
source.forEach(header -> {
291-
if (header.key().equals(KafkaHeaders.DELIVERY_ATTEMPT)) {
292-
headers.put(header.key(), ByteBuffer.wrap(header.value()).getInt());
316+
String headerName = header.key();
317+
if (headerName.equals(KafkaHeaders.DELIVERY_ATTEMPT) && matchesForInbound(headerName)) {
318+
headers.put(headerName, ByteBuffer.wrap(header.value()).getInt());
293319
}
294-
else if (header.key().equals(KafkaHeaders.LISTENER_INFO)) {
295-
headers.put(header.key(), new String(header.value(), getCharset()));
320+
else if (headerName.equals(KafkaHeaders.LISTENER_INFO) && matchesForInbound(headerName)) {
321+
headers.put(headerName, new String(header.value(), getCharset()));
296322
}
297-
else if (!(header.key().equals(JSON_TYPES))) {
298-
if (jsonTypes != null && jsonTypes.containsKey(header.key())) {
299-
String requestedType = jsonTypes.get(header.key());
323+
else if (!(headerName.equals(JSON_TYPES)) && matchesForInbound(headerName)) {
324+
if (jsonTypes != null && jsonTypes.containsKey(headerName)) {
325+
String requestedType = jsonTypes.get(headerName);
300326
populateJsonValueHeader(header, requestedType, headers);
301327
}
302328
else {
303-
headers.put(header.key(), headerValueToAddIn(header));
329+
headers.put(headerName, headerValueToAddIn(header));
304330
}
305331
}
306332
});

Diff for: spring-kafka/src/main/java/org/springframework/kafka/support/SimpleKafkaHeaderMapper.java

+23-6
Original file line numberDiff line numberDiff line change
@@ -73,7 +73,21 @@ public SimpleKafkaHeaderMapper() {
7373
* @see org.springframework.util.PatternMatchUtils#simpleMatch(String, String)
7474
*/
7575
public SimpleKafkaHeaderMapper(String... patterns) {
76-
super(patterns);
76+
this(true, patterns);
77+
}
78+
79+
private SimpleKafkaHeaderMapper(boolean outbound, String... patterns) {
80+
super(outbound, patterns);
81+
}
82+
83+
/**
84+
* Create an instance for inbound mapping only with pattern matching.
85+
* @param patterns the patterns to match.
86+
* @return the header mapper.
87+
* @since 2.8.8
88+
*/
89+
public static SimpleKafkaHeaderMapper forInboundOnlyWithMatchers(String... patterns) {
90+
return new SimpleKafkaHeaderMapper(false, patterns);
7791
}
7892

7993
@Override
@@ -91,11 +105,14 @@ public void fromHeaders(MessageHeaders headers, Headers target) {
91105
@Override
92106
public void toHeaders(Headers source, Map<String, Object> target) {
93107
source.forEach(header -> {
94-
if (header.key().equals(KafkaHeaders.DELIVERY_ATTEMPT)) {
95-
target.put(header.key(), ByteBuffer.wrap(header.value()).getInt());
96-
}
97-
else {
98-
target.put(header.key(), headerValueToAddIn(header));
108+
String headerName = header.key();
109+
if (matchesForInbound(headerName)) {
110+
if (headerName.equals(KafkaHeaders.DELIVERY_ATTEMPT)) {
111+
target.put(headerName, ByteBuffer.wrap(header.value()).getInt());
112+
}
113+
else {
114+
target.put(headerName, headerValueToAddIn(header));
115+
}
99116
}
100117
});
101118
}

Diff for: spring-kafka/src/test/java/org/springframework/kafka/support/DefaultKafkaHeaderMapperTests.java

+20
Original file line numberDiff line numberDiff line change
@@ -301,6 +301,26 @@ void listenerInfo() {
301301
assertThat(headers.lastHeader(KafkaHeaders.LISTENER_INFO)).isNull();
302302
}
303303

304+
@Test
305+
void inboundJson() {
306+
DefaultKafkaHeaderMapper outboundMapper = new DefaultKafkaHeaderMapper();
307+
DefaultKafkaHeaderMapper inboundMapper = DefaultKafkaHeaderMapper.forInboundOnlyWithMatchers("!fo*", "*");
308+
HashMap<String, Object> map = new HashMap<>();
309+
map.put("foo", "bar");
310+
map.put("foa", "bar");
311+
map.put("baz", "qux");
312+
MessageHeaders msgHeaders = new MessageHeaders(map);
313+
Headers headers = new RecordHeaders();
314+
outboundMapper.fromHeaders(msgHeaders, headers);
315+
headers.add(KafkaHeaders.DELIVERY_ATTEMPT, new byte[] { 0, 0, 0, 1 });
316+
map.clear();
317+
inboundMapper.toHeaders(headers, map);
318+
assertThat(map).doesNotContainKey("foo")
319+
.doesNotContainKey("foa")
320+
.containsKey(KafkaHeaders.DELIVERY_ATTEMPT)
321+
.containsKey("baz");
322+
}
323+
304324
public static final class Foo {
305325

306326
private String bar = "bar";

Diff for: spring-kafka/src/test/java/org/springframework/kafka/support/SimpleKafkaHeaderMapperTests.java

+27
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
package org.springframework.kafka.support;
1818

1919
import static org.assertj.core.api.Assertions.assertThat;
20+
import static org.assertj.core.api.Assertions.assertThatIllegalStateException;
2021
import static org.assertj.core.api.Assertions.entry;
2122

2223
import java.nio.ByteBuffer;
@@ -159,4 +160,30 @@ void listenerInfo() {
159160
assertThat(headers.lastHeader(KafkaHeaders.LISTENER_INFO)).isNull();
160161
}
161162

163+
@Test
164+
void inboundMappingNoPatterns() {
165+
SimpleKafkaHeaderMapper inboundMapper = SimpleKafkaHeaderMapper.forInboundOnlyWithMatchers();
166+
Headers headers = new RecordHeaders();
167+
headers.add("foo", "bar".getBytes());
168+
headers.add(KafkaHeaders.DELIVERY_ATTEMPT, new byte[] { 0, 0, 0, 1 });
169+
Map<String, Object> mapped = new HashMap<>();
170+
inboundMapper.toHeaders(headers, mapped);
171+
assertThat(mapped).containsKey("foo")
172+
.containsKey(KafkaHeaders.DELIVERY_ATTEMPT);
173+
assertThatIllegalStateException()
174+
.isThrownBy(() -> inboundMapper.fromHeaders(new MessageHeaders(mapped), headers));
175+
}
176+
177+
@Test
178+
void inboundMappingWithPatterns() {
179+
SimpleKafkaHeaderMapper inboundMapper = SimpleKafkaHeaderMapper.forInboundOnlyWithMatchers("!foo", "*");
180+
Headers headers = new RecordHeaders();
181+
headers.add("foo", "bar".getBytes());
182+
headers.add(KafkaHeaders.DELIVERY_ATTEMPT, new byte[] { 0, 0, 0, 1 });
183+
Map<String, Object> mapped = new HashMap<>();
184+
inboundMapper.toHeaders(headers, mapped);
185+
assertThat(mapped).doesNotContainKey("foo")
186+
.containsKey(KafkaHeaders.DELIVERY_ATTEMPT);
187+
}
188+
162189
}

0 commit comments

Comments
 (0)