19
19
import java .io .IOException ;
20
20
import java .nio .ByteBuffer ;
21
21
import java .nio .charset .StandardCharsets ;
22
+ import java .util .ArrayList ;
22
23
import java .util .Arrays ;
23
24
import java .util .Collections ;
24
25
import java .util .HashMap ;
25
26
import java .util .LinkedHashSet ;
26
27
import java .util .List ;
27
28
import java .util .Map ;
28
29
import java .util .Set ;
30
+ import java .util .stream .Collectors ;
29
31
30
32
import org .apache .kafka .common .header .Header ;
31
33
import org .apache .kafka .common .header .Headers ;
32
34
import org .apache .kafka .common .header .internals .RecordHeader ;
35
+ import org .assertj .core .util .Streams ;
33
36
34
37
import org .springframework .messaging .MessageHeaders ;
35
38
import org .springframework .util .Assert ;
49
52
* @author Gary Russell
50
53
* @author Artem Bilan
51
54
* @author Soby Chacko
55
+ * @author Grzegorz Poznachowski
52
56
*
53
57
* @since 1.3
54
- *
55
58
*/
56
59
public class DefaultKafkaHeaderMapper extends AbstractKafkaHeaderMapper {
57
60
61
+ private static final String ITERABLE_HEADER_TYPE_PATTERN = "%s#%s" ;
62
+
58
63
private static final String JAVA_LANG_STRING = "java.lang.String" ;
59
64
60
65
private static final Set <String > TRUSTED_ARRAY_TYPES = Set .of (
@@ -97,6 +102,7 @@ public class DefaultKafkaHeaderMapper extends AbstractKafkaHeaderMapper {
97
102
* {@code "!id", "!timestamp" and "*"}. In addition, most of the headers in
98
103
* {@link KafkaHeaders} are never mapped as headers since they represent data in
99
104
* consumer/producer records.
105
+ *
100
106
* @see #DefaultKafkaHeaderMapper(ObjectMapper)
101
107
*/
102
108
public DefaultKafkaHeaderMapper () {
@@ -111,6 +117,7 @@ public DefaultKafkaHeaderMapper() {
111
117
* {@code "!id", "!timestamp" and "*"}. In addition, most of the headers in
112
118
* {@link KafkaHeaders} are never mapped as headers since they represent data in
113
119
* consumer/producer records.
120
+ *
114
121
* @param objectMapper the object mapper.
115
122
* @see org.springframework.util.PatternMatchUtils#simpleMatch(String, String)
116
123
*/
@@ -129,6 +136,7 @@ public DefaultKafkaHeaderMapper(ObjectMapper objectMapper) {
129
136
* generally should not map the {@code "id" and "timestamp"} headers. Note:
130
137
* most of the headers in {@link KafkaHeaders} are ever mapped as headers since they
131
138
* represent data in consumer/producer records.
139
+ *
132
140
* @param patterns the patterns.
133
141
* @see org.springframework.util.PatternMatchUtils#simpleMatch(String, String)
134
142
*/
@@ -144,8 +152,9 @@ public DefaultKafkaHeaderMapper(String... patterns) {
144
152
* you generally should not map the {@code "id" and "timestamp"} headers. Note: most
145
153
* of the headers in {@link KafkaHeaders} are never mapped as headers since they
146
154
* represent data in consumer/producer records.
155
+ *
147
156
* @param objectMapper the object mapper.
148
- * @param patterns the patterns.
157
+ * @param patterns the patterns.
149
158
* @see org.springframework.util.PatternMatchUtils#simpleMatch(String, String)
150
159
*/
151
160
public DefaultKafkaHeaderMapper (ObjectMapper objectMapper , String ... patterns ) {
@@ -161,6 +170,7 @@ private DefaultKafkaHeaderMapper(boolean outbound, ObjectMapper objectMapper, St
161
170
162
171
/**
163
172
* Create an instance for inbound mapping only with pattern matching.
173
+ *
164
174
* @param patterns the patterns to match.
165
175
* @return the header mapper.
166
176
* @since 2.8.8
@@ -171,8 +181,9 @@ public static DefaultKafkaHeaderMapper forInboundOnlyWithMatchers(String... patt
171
181
172
182
/**
173
183
* Create an instance for inbound mapping only with pattern matching.
184
+ *
174
185
* @param objectMapper the object mapper.
175
- * @param patterns the patterns to match.
186
+ * @param patterns the patterns to match.
176
187
* @return the header mapper.
177
188
* @since 2.8.8
178
189
*/
@@ -182,6 +193,7 @@ public static DefaultKafkaHeaderMapper forInboundOnlyWithMatchers(ObjectMapper o
182
193
183
194
/**
184
195
* Return the object mapper.
196
+ *
185
197
* @return the mapper.
186
198
*/
187
199
protected ObjectMapper getObjectMapper () {
@@ -190,6 +202,7 @@ protected ObjectMapper getObjectMapper() {
190
202
191
203
/**
192
204
* Provide direct access to the trusted packages set for subclasses.
205
+ *
193
206
* @return the trusted packages.
194
207
* @since 2.2
195
208
*/
@@ -199,6 +212,7 @@ protected Set<String> getTrustedPackages() {
199
212
200
213
/**
201
214
* Provide direct access to the toString() classes by subclasses.
215
+ *
202
216
* @return the toString() classes.
203
217
* @since 2.2
204
218
*/
@@ -215,6 +229,7 @@ protected boolean isEncodeStrings() {
215
229
* raw String value is converted to a byte array using the configured charset. Set to
216
230
* true if a consumer of the outbound record is using Spring for Apache Kafka version
217
231
* less than 2.3
232
+ *
218
233
* @param encodeStrings true to encode (default false).
219
234
* @since 2.3
220
235
*/
@@ -235,6 +250,7 @@ public void setEncodeStrings(boolean encodeStrings) {
235
250
* If any of the supplied packages is {@code "*"}, all packages are trusted.
236
251
* If a class for a non-trusted package is encountered, the header is returned to the
237
252
* application with value of type {@link NonTrustedHeaderType}.
253
+ *
238
254
* @param packagesToTrust the packages to trust.
239
255
*/
240
256
public void addTrustedPackages (String ... packagesToTrust ) {
@@ -254,6 +270,7 @@ public void addTrustedPackages(String... packagesToTrust) {
254
270
/**
255
271
* Add class names that the outbound mapper should perform toString() operations on
256
272
* before mapping.
273
+ *
257
274
* @param classNames the class names.
258
275
* @since 2.2
259
276
*/
@@ -265,32 +282,15 @@ public void addToStringClasses(String... classNames) {
265
282
public void fromHeaders (MessageHeaders headers , Headers target ) {
266
283
final Map <String , String > jsonHeaders = new HashMap <>();
267
284
final ObjectMapper headerObjectMapper = getObjectMapper ();
268
- headers .forEach ((key , rawValue ) -> {
269
- if (matches (key , rawValue )) {
270
- Object valueToAdd = headerValueToAddOut (key , rawValue );
271
- if (valueToAdd instanceof byte []) {
272
- target .add (new RecordHeader (key , (byte []) valueToAdd ));
285
+ headers .forEach ((key , value ) -> {
286
+ if (matches (key , value )) {
287
+ if (value instanceof List <?> values ) {
288
+ for (int i = 0 ; i < values .size (); i ++) {
289
+ resolveHeader (key , values .get (i ), target , jsonHeaders , i );
290
+ }
273
291
}
274
292
else {
275
- try {
276
- String className = valueToAdd .getClass ().getName ();
277
- boolean encodeToJson = this .encodeStrings ;
278
- if (this .toStringClasses .contains (className )) {
279
- valueToAdd = valueToAdd .toString ();
280
- className = JAVA_LANG_STRING ;
281
- encodeToJson = true ;
282
- }
283
- if (!encodeToJson && valueToAdd instanceof String ) {
284
- target .add (new RecordHeader (key , ((String ) valueToAdd ).getBytes (getCharset ())));
285
- }
286
- else {
287
- target .add (new RecordHeader (key , headerObjectMapper .writeValueAsBytes (valueToAdd )));
288
- }
289
- jsonHeaders .put (key , className );
290
- }
291
- catch (Exception e ) {
292
- logger .error (e , () -> "Could not map " + key + " with type " + rawValue .getClass ().getName ());
293
- }
293
+ resolveHeader (key , value , target , jsonHeaders , null );
294
294
}
295
295
}
296
296
});
@@ -304,34 +304,84 @@ public void fromHeaders(MessageHeaders headers, Headers target) {
304
304
}
305
305
}
306
306
307
- @ Override
308
- public void toHeaders (Headers source , final Map <String , Object > headers ) {
309
- final Map <String , String > jsonTypes = decodeJsonTypes (source );
310
- source .forEach (header -> {
311
- String headerName = header .key ();
312
- if (headerName .equals (KafkaHeaders .DELIVERY_ATTEMPT ) && matchesForInbound (headerName )) {
313
- headers .put (headerName , ByteBuffer .wrap (header .value ()).getInt ());
314
- }
315
- else if (headerName .equals (KafkaHeaders .LISTENER_INFO ) && matchesForInbound (headerName )) {
316
- headers .put (headerName , new String (header .value (), getCharset ()));
317
- }
318
- else if (headerName .equals (KafkaUtils .KEY_DESERIALIZER_EXCEPTION_HEADER ) ||
319
- headerName .equals (KafkaUtils .VALUE_DESERIALIZER_EXCEPTION_HEADER )) {
320
- headers .put (headerName , header );
321
- }
322
- else if (!(headerName .equals (JSON_TYPES )) && matchesForInbound (headerName )) {
323
- if (jsonTypes .containsKey (headerName )) {
324
- String requestedType = jsonTypes .get (headerName );
325
- populateJsonValueHeader (header , requestedType , headers );
307
+ private void resolveHeader (String headerName , Object value , Headers target , Map <String , String > jsonHeaders , Integer headerIndex ) {
308
+ Object valueToAdd = headerValueToAddOut (headerName , value );
309
+ if (valueToAdd instanceof byte [] byteArray ) {
310
+ target .add (new RecordHeader (headerName , byteArray ));
311
+ }
312
+ else {
313
+ try {
314
+ String className = valueToAdd .getClass ().getName ();
315
+ boolean encodeToJson = this .encodeStrings ;
316
+ if (this .toStringClasses .contains (className )) {
317
+ valueToAdd = valueToAdd .toString ();
318
+ className = JAVA_LANG_STRING ;
319
+ encodeToJson = true ;
320
+ }
321
+ if (!encodeToJson && valueToAdd instanceof String stringValue ) {
322
+ target .add (new RecordHeader (headerName , stringValue .getBytes (getCharset ())));
326
323
}
327
324
else {
328
- headers . put ( headerName , headerValueToAddIn ( header ));
325
+ target . add ( new RecordHeader ( headerName , this . objectMapper . writeValueAsBytes ( valueToAdd ) ));
329
326
}
327
+ jsonHeaders .put (headerIndex == null ?
328
+ headerName :
329
+ ITERABLE_HEADER_TYPE_PATTERN .formatted (headerName , headerIndex ), className );
330
330
}
331
- });
331
+ catch (Exception e ) {
332
+ logger .error (e , () -> "Could not map " + headerName + " with type " + value .getClass ().getName ());
333
+ }
334
+ }
335
+ }
336
+
337
+ @ Override
338
+ public void toHeaders (Headers source , final Map <String , Object > target ) {
339
+ final Map <String , String > jsonTypes = decodeJsonTypes (source );
340
+
341
+ Streams .stream (source )
342
+ .collect (Collectors .groupingBy (Header ::key ))
343
+ .forEach ((headerName , headers ) -> {
344
+ Header lastHeader = headers .get (headers .size () - 1 );
345
+ if (headerName .equals (KafkaUtils .KEY_DESERIALIZER_EXCEPTION_HEADER ) ||
346
+ headerName .equals (KafkaUtils .VALUE_DESERIALIZER_EXCEPTION_HEADER )) {
347
+ target .put (headerName , lastHeader );
348
+ }
349
+ else if (headerName .equals (KafkaHeaders .DELIVERY_ATTEMPT ) && matchesForInbound (headerName )) {
350
+ target .put (headerName , ByteBuffer .wrap (lastHeader .value ()).getInt ());
351
+ }
352
+ else if (headerName .equals (KafkaHeaders .LISTENER_INFO ) && matchesForInbound (headerName )) {
353
+ target .put (headerName , new String (lastHeader .value (), getCharset ()));
354
+ }
355
+ else if (!(headerName .equals (JSON_TYPES )) && matchesForInbound (headerName )) {
356
+ if (headers .size () == 1 ) {
357
+ if (jsonTypes .containsKey (headerName )) {
358
+ String requestedType = jsonTypes .get (headerName );
359
+ target .put (headerName , resolveJsonValueHeader (headers .get (0 ), requestedType ));
360
+ }
361
+ else {
362
+ target .put (headerName , headerValueToAddIn (headers .get (0 )));
363
+ }
364
+ }
365
+ else {
366
+ List <Object > valueList = new ArrayList <>();
367
+ for (int i = 0 ; i < headers .size (); i ++) {
368
+ var jsonTypeIterableHeader = ITERABLE_HEADER_TYPE_PATTERN .formatted (headerName , i );
369
+ if (jsonTypes .containsKey (jsonTypeIterableHeader )) {
370
+ String requestedType = jsonTypes .get (jsonTypeIterableHeader );
371
+ valueList .add (resolveJsonValueHeader (headers .get (i ), requestedType ));
372
+ }
373
+ else {
374
+ valueList .add (headerValueToAddIn (headers .get (i )));
375
+ }
376
+ }
377
+ Collections .reverse (valueList );
378
+ target .put (headerName , valueList );
379
+ }
380
+ }
381
+ });
332
382
}
333
383
334
- private void populateJsonValueHeader (Header header , String requestedType , Map < String , Object > headers ) {
384
+ private Object resolveJsonValueHeader (Header header , String requestedType ) {
335
385
Class <?> type = Object .class ;
336
386
boolean trusted = false ;
337
387
try {
@@ -344,22 +394,21 @@ private void populateJsonValueHeader(Header header, String requestedType, Map<St
344
394
logger .error (e , () -> "Could not load class for header: " + header .key ());
345
395
}
346
396
if (String .class .equals (type ) && (header .value ().length == 0 || header .value ()[0 ] != '"' )) {
347
- headers . put ( header . key (), new String (header .value (), getCharset () ));
397
+ return new String (header .value (), getCharset ());
348
398
}
349
399
else {
350
400
if (trusted ) {
351
401
try {
352
- Object value = decodeValue (header , type );
353
- headers .put (header .key (), value );
402
+ return decodeValue (header , type );
354
403
}
355
404
catch (IOException e ) {
356
405
logger .error (e , () ->
357
406
"Could not decode json type: " + requestedType + " for key: " + header .key ());
358
- headers . put ( header . key (), header .value () );
407
+ return header .value ();
359
408
}
360
409
}
361
410
else {
362
- headers . put ( header . key (), new NonTrustedHeaderType (header .value (), requestedType ) );
411
+ return new NonTrustedHeaderType (header .value (), requestedType );
363
412
}
364
413
}
365
414
}
0 commit comments