@@ -157,18 +157,9 @@ public Message<?> toMessage(List<ConsumerRecord<?, ?>> records, @Nullable Acknow
157
157
List <Headers > natives = new ArrayList <>();
158
158
List <ConsumerRecord <?, ?>> raws = new ArrayList <>();
159
159
List <ConversionException > conversionFailures = new ArrayList <>();
160
- if (this .headerMapper != null ) {
161
- rawHeaders .put (KafkaHeaders .BATCH_CONVERTED_HEADERS , convertedHeaders );
162
- }
163
- else {
164
- rawHeaders .put (KafkaHeaders .NATIVE_HEADERS , natives );
165
- }
166
- if (this .rawRecordHeader ) {
167
- rawHeaders .put (KafkaHeaders .RAW_DATA , raws );
168
- }
160
+ addToRawHeaders (rawHeaders , convertedHeaders , natives , raws , conversionFailures );
169
161
commonHeaders (acknowledgment , consumer , rawHeaders , keys , topics , partitions , offsets , timestampTypes ,
170
162
timestamps );
171
- rawHeaders .put (KafkaHeaders .CONVERSION_FAILURES , conversionFailures );
172
163
boolean logged = false ;
173
164
String info = null ;
174
165
for (ConsumerRecord <?, ?> record : records ) {
@@ -210,6 +201,21 @@ public Message<?> toMessage(List<ConsumerRecord<?, ?>> records, @Nullable Acknow
210
201
return MessageBuilder .createMessage (payloads , kafkaMessageHeaders );
211
202
}
212
203
204
+ private void addToRawHeaders (Map <String , Object > rawHeaders , List <Map <String , Object >> convertedHeaders ,
205
+ List <Headers > natives , List <ConsumerRecord <?, ?>> raws , List <ConversionException > conversionFailures ) {
206
+
207
+ if (this .headerMapper != null ) {
208
+ rawHeaders .put (KafkaHeaders .BATCH_CONVERTED_HEADERS , convertedHeaders );
209
+ }
210
+ else {
211
+ rawHeaders .put (KafkaHeaders .NATIVE_HEADERS , natives );
212
+ }
213
+ if (this .rawRecordHeader ) {
214
+ rawHeaders .put (KafkaHeaders .RAW_DATA , raws );
215
+ }
216
+ rawHeaders .put (KafkaHeaders .CONVERSION_FAILURES , conversionFailures );
217
+ }
218
+
213
219
private Object obtainPayload (Type type , ConsumerRecord <?, ?> record , List <ConversionException > conversionFailures ) {
214
220
return this .recordConverter == null || !containerType (type )
215
221
? extractAndConvertValue (record , type )
0 commit comments