Skip to content

Commit 0f20c3b

Browse files
committed
spring-projectsGH-3067: Draft of mapping multiple headers with same key with SimpleKafkaHeaderMapper
1 parent 265e55f commit 0f20c3b

File tree

4 files changed

+197
-74
lines changed

4 files changed

+197
-74
lines changed

spring-kafka/src/main/java/org/springframework/kafka/support/DefaultKafkaHeaderMapper.java

+104-50
Original file line numberDiff line numberDiff line change
@@ -19,17 +19,21 @@
1919
import java.io.IOException;
2020
import java.nio.ByteBuffer;
2121
import java.nio.charset.StandardCharsets;
22+
import java.util.ArrayList;
2223
import java.util.Arrays;
24+
import java.util.Collection;
2325
import java.util.Collections;
2426
import java.util.HashMap;
2527
import java.util.LinkedHashSet;
2628
import java.util.List;
2729
import java.util.Map;
2830
import java.util.Set;
31+
import java.util.stream.Collectors;
2932

3033
import org.apache.kafka.common.header.Header;
3134
import org.apache.kafka.common.header.Headers;
3235
import org.apache.kafka.common.header.internals.RecordHeader;
36+
import org.assertj.core.util.Streams;
3337

3438
import org.springframework.messaging.MessageHeaders;
3539
import org.springframework.util.Assert;
@@ -48,12 +52,14 @@
4852
*
4953
* @author Gary Russell
5054
* @author Artem Bilan
55+
* @author Grzegorz Poznachowski
5156
*
5257
* @since 1.3
53-
*
5458
*/
5559
public class DefaultKafkaHeaderMapper extends AbstractKafkaHeaderMapper {
5660

61+
private static final String ITERABLE_HEADER_TYPE_PATTERN = "%s#%s";
62+
5763
private static final String JAVA_LANG_STRING = "java.lang.String";
5864

5965
private static final Set<String> TRUSTED_ARRAY_TYPES = Set.of(
@@ -96,6 +102,7 @@ public class DefaultKafkaHeaderMapper extends AbstractKafkaHeaderMapper {
96102
* {@code "!id", "!timestamp" and "*"}. In addition, most of the headers in
97103
* {@link KafkaHeaders} are never mapped as headers since they represent data in
98104
* consumer/producer records.
105+
*
99106
* @see #DefaultKafkaHeaderMapper(ObjectMapper)
100107
*/
101108
public DefaultKafkaHeaderMapper() {
@@ -110,6 +117,7 @@ public DefaultKafkaHeaderMapper() {
110117
* {@code "!id", "!timestamp" and "*"}. In addition, most of the headers in
111118
* {@link KafkaHeaders} are never mapped as headers since they represent data in
112119
* consumer/producer records.
120+
*
113121
* @param objectMapper the object mapper.
114122
* @see org.springframework.util.PatternMatchUtils#simpleMatch(String, String)
115123
*/
@@ -128,6 +136,7 @@ public DefaultKafkaHeaderMapper(ObjectMapper objectMapper) {
128136
* generally should not map the {@code "id" and "timestamp"} headers. Note:
129137
* most of the headers in {@link KafkaHeaders} are ever mapped as headers since they
130138
* represent data in consumer/producer records.
139+
*
131140
* @param patterns the patterns.
132141
* @see org.springframework.util.PatternMatchUtils#simpleMatch(String, String)
133142
*/
@@ -143,8 +152,9 @@ public DefaultKafkaHeaderMapper(String... patterns) {
143152
* you generally should not map the {@code "id" and "timestamp"} headers. Note: most
144153
* of the headers in {@link KafkaHeaders} are never mapped as headers since they
145154
* represent data in consumer/producer records.
155+
*
146156
* @param objectMapper the object mapper.
147-
* @param patterns the patterns.
157+
* @param patterns the patterns.
148158
* @see org.springframework.util.PatternMatchUtils#simpleMatch(String, String)
149159
*/
150160
public DefaultKafkaHeaderMapper(ObjectMapper objectMapper, String... patterns) {
@@ -160,6 +170,7 @@ private DefaultKafkaHeaderMapper(boolean outbound, ObjectMapper objectMapper, St
160170

161171
/**
162172
* Create an instance for inbound mapping only with pattern matching.
173+
*
163174
* @param patterns the patterns to match.
164175
* @return the header mapper.
165176
* @since 2.8.8
@@ -170,8 +181,9 @@ public static DefaultKafkaHeaderMapper forInboundOnlyWithMatchers(String... patt
170181

171182
/**
172183
* Create an instance for inbound mapping only with pattern matching.
184+
*
173185
* @param objectMapper the object mapper.
174-
* @param patterns the patterns to match.
186+
* @param patterns the patterns to match.
175187
* @return the header mapper.
176188
* @since 2.8.8
177189
*/
@@ -181,6 +193,7 @@ public static DefaultKafkaHeaderMapper forInboundOnlyWithMatchers(ObjectMapper o
181193

182194
/**
183195
* Return the object mapper.
196+
*
184197
* @return the mapper.
185198
*/
186199
protected ObjectMapper getObjectMapper() {
@@ -189,6 +202,7 @@ protected ObjectMapper getObjectMapper() {
189202

190203
/**
191204
* Provide direct access to the trusted packages set for subclasses.
205+
*
192206
* @return the trusted packages.
193207
* @since 2.2
194208
*/
@@ -198,6 +212,7 @@ protected Set<String> getTrustedPackages() {
198212

199213
/**
200214
* Provide direct access to the toString() classes by subclasses.
215+
*
201216
* @return the toString() classes.
202217
* @since 2.2
203218
*/
@@ -214,6 +229,7 @@ protected boolean isEncodeStrings() {
214229
* raw String value is converted to a byte array using the configured charset. Set to
215230
* true if a consumer of the outbound record is using Spring for Apache Kafka version
216231
* less than 2.3
232+
*
217233
* @param encodeStrings true to encode (default false).
218234
* @since 2.3
219235
*/
@@ -234,6 +250,7 @@ public void setEncodeStrings(boolean encodeStrings) {
234250
* If any of the supplied packages is {@code "*"}, all packages are trusted.
235251
* If a class for a non-trusted package is encountered, the header is returned to the
236252
* application with value of type {@link NonTrustedHeaderType}.
253+
*
237254
* @param packagesToTrust the packages to trust.
238255
*/
239256
public void addTrustedPackages(String... packagesToTrust) {
@@ -253,6 +270,7 @@ public void addTrustedPackages(String... packagesToTrust) {
253270
/**
254271
* Add class names that the outbound mapper should perform toString() operations on
255272
* before mapping.
273+
*
256274
* @param classNames the class names.
257275
* @since 2.2
258276
*/
@@ -264,32 +282,17 @@ public void addToStringClasses(String... classNames) {
264282
public void fromHeaders(MessageHeaders headers, Headers target) {
265283
final Map<String, String> jsonHeaders = new HashMap<>();
266284
final ObjectMapper headerObjectMapper = getObjectMapper();
267-
headers.forEach((key, rawValue) -> {
268-
if (matches(key, rawValue)) {
269-
Object valueToAdd = headerValueToAddOut(key, rawValue);
270-
if (valueToAdd instanceof byte[]) {
271-
target.add(new RecordHeader(key, (byte[]) valueToAdd));
285+
headers.forEach((key, value) -> {
286+
if (matches(key, value)) {
287+
if (value instanceof Collection<?> values) {
288+
int i = 0;
289+
for (Object element : values) {
290+
resolveSingleHeader(key, element, target, jsonHeaders, i);
291+
i++;
292+
}
272293
}
273294
else {
274-
try {
275-
String className = valueToAdd.getClass().getName();
276-
boolean encodeToJson = this.encodeStrings;
277-
if (this.toStringClasses.contains(className)) {
278-
valueToAdd = valueToAdd.toString();
279-
className = JAVA_LANG_STRING;
280-
encodeToJson = true;
281-
}
282-
if (!encodeToJson && valueToAdd instanceof String) {
283-
target.add(new RecordHeader(key, ((String) valueToAdd).getBytes(getCharset())));
284-
}
285-
else {
286-
target.add(new RecordHeader(key, headerObjectMapper.writeValueAsBytes(valueToAdd)));
287-
}
288-
jsonHeaders.put(key, className);
289-
}
290-
catch (Exception e) {
291-
logger.error(e, () -> "Could not map " + key + " with type " + rawValue.getClass().getName());
292-
}
295+
resolveSingleHeader(key, value, target, jsonHeaders);
293296
}
294297
}
295298
});
@@ -303,30 +306,82 @@ public void fromHeaders(MessageHeaders headers, Headers target) {
303306
}
304307
}
305308

306-
@Override
307-
public void toHeaders(Headers source, final Map<String, Object> headers) {
308-
final Map<String, String> jsonTypes = decodeJsonTypes(source);
309-
source.forEach(header -> {
310-
String headerName = header.key();
311-
if (headerName.equals(KafkaHeaders.DELIVERY_ATTEMPT) && matchesForInbound(headerName)) {
312-
headers.put(headerName, ByteBuffer.wrap(header.value()).getInt());
313-
}
314-
else if (headerName.equals(KafkaHeaders.LISTENER_INFO) && matchesForInbound(headerName)) {
315-
headers.put(headerName, new String(header.value(), getCharset()));
316-
}
317-
else if (!(headerName.equals(JSON_TYPES)) && matchesForInbound(headerName)) {
318-
if (jsonTypes.containsKey(headerName)) {
319-
String requestedType = jsonTypes.get(headerName);
320-
populateJsonValueHeader(header, requestedType, headers);
309+
private void resolveSingleHeader(String headerName, Object value, Headers target, Map<String, String> jsonHeaders) {
310+
resolveSingleHeader(headerName, value, target, jsonHeaders, null);
311+
}
312+
313+
private void resolveSingleHeader(String headerName, Object value, Headers target, Map<String, String> jsonHeaders, Integer headerIndex) {
314+
Object valueToAdd = headerValueToAddOut(headerName, value);
315+
if (valueToAdd instanceof byte[] byteArray) {
316+
target.add(new RecordHeader(headerName, byteArray));
317+
}
318+
else {
319+
try {
320+
String className = valueToAdd.getClass().getName();
321+
boolean encodeToJson = this.encodeStrings;
322+
if (this.toStringClasses.contains(className)) {
323+
valueToAdd = valueToAdd.toString();
324+
className = JAVA_LANG_STRING;
325+
encodeToJson = true;
326+
}
327+
if (!encodeToJson && valueToAdd instanceof String stringValue) {
328+
target.add(new RecordHeader(headerName, stringValue.getBytes(getCharset())));
321329
}
322330
else {
323-
headers.put(headerName, headerValueToAddIn(header));
331+
target.add(new RecordHeader(headerName, this.objectMapper.writeValueAsBytes(valueToAdd)));
324332
}
333+
jsonHeaders.put(headerIndex == null ?
334+
headerName :
335+
ITERABLE_HEADER_TYPE_PATTERN.formatted(headerName, headerIndex), className);
325336
}
326-
});
337+
catch (Exception e) {
338+
logger.error(e, () -> "Could not map " + headerName + " with type " + value.getClass().getName());
339+
}
340+
}
341+
}
342+
343+
@Override
344+
public void toHeaders(Headers source, final Map<String, Object> target) {
345+
final Map<String, String> jsonTypes = decodeJsonTypes(source);
346+
347+
Streams.stream(source)
348+
.collect(Collectors.groupingBy(Header::key))
349+
.forEach((headerName, headers) -> {
350+
if (headerName.equals(KafkaHeaders.DELIVERY_ATTEMPT) && matchesForInbound(headerName)) {
351+
target.put(headerName, ByteBuffer.wrap(headers.get(headers.size() - 1).value()).getInt());
352+
}
353+
else if (headerName.equals(KafkaHeaders.LISTENER_INFO) && matchesForInbound(headerName)) {
354+
target.put(headerName, new String(headers.get(headers.size() - 1).value(), getCharset()));
355+
}
356+
else if (!(headerName.equals(JSON_TYPES)) && matchesForInbound(headerName)) {
357+
if (headers.size() == 1) {
358+
if (jsonTypes.containsKey(headerName)) {
359+
String requestedType = jsonTypes.get(headerName);
360+
target.put(headerName, resolveJsonValueHeader(headers.get(0), requestedType));
361+
}
362+
else {
363+
target.put(headerName, headerValueToAddIn(headers.get(0)));
364+
}
365+
}
366+
else {
367+
List<Object> valueList = new ArrayList<>();
368+
for (int i = 0; i < headers.size(); i++) {
369+
var jsonTypeIterableHeader = ITERABLE_HEADER_TYPE_PATTERN.formatted(headerName, i);
370+
if (jsonTypes.containsKey(jsonTypeIterableHeader)) {
371+
String requestedType = jsonTypes.get(jsonTypeIterableHeader);
372+
valueList.add(resolveJsonValueHeader(headers.get(i), requestedType));
373+
}
374+
else {
375+
valueList.add(headerValueToAddIn(headers.get(i)));
376+
}
377+
}
378+
target.put(headerName, valueList);
379+
}
380+
}
381+
});
327382
}
328383

329-
private void populateJsonValueHeader(Header header, String requestedType, Map<String, Object> headers) {
384+
private Object resolveJsonValueHeader(Header header, String requestedType) {
330385
Class<?> type = Object.class;
331386
boolean trusted = false;
332387
try {
@@ -339,22 +394,21 @@ private void populateJsonValueHeader(Header header, String requestedType, Map<St
339394
logger.error(e, () -> "Could not load class for header: " + header.key());
340395
}
341396
if (String.class.equals(type) && (header.value().length == 0 || header.value()[0] != '"')) {
342-
headers.put(header.key(), new String(header.value(), getCharset()));
397+
return new String(header.value(), getCharset());
343398
}
344399
else {
345400
if (trusted) {
346401
try {
347-
Object value = decodeValue(header, type);
348-
headers.put(header.key(), value);
402+
return decodeValue(header, type);
349403
}
350404
catch (IOException e) {
351405
logger.error(e, () ->
352406
"Could not decode json type: " + requestedType + " for key: " + header.key());
353-
headers.put(header.key(), header.value());
407+
return header.value();
354408
}
355409
}
356410
else {
357-
headers.put(header.key(), new NonTrustedHeaderType(header.value(), requestedType));
411+
return new NonTrustedHeaderType(header.value(), requestedType);
358412
}
359413
}
360414
}

0 commit comments

Comments
 (0)