21
21
import java .nio .ByteBuffer ;
22
22
import java .nio .charset .StandardCharsets ;
23
23
import java .time .Duration ;
24
- import java .util .BitSet ;
25
24
import java .util .Collections ;
25
+ import java .util .EnumSet ;
26
26
import java .util .List ;
27
27
import java .util .Map ;
28
28
import java .util .Optional ;
@@ -88,7 +88,7 @@ public class DeadLetterPublishingRecoverer extends ExceptionClassifier implement
88
88
89
89
private final Function <ProducerRecord <?, ?>, KafkaOperations <?, ?>> templateResolver ;
90
90
91
- private final BitSet whichHeaders = new BitSet ( 10 );
91
+ private final EnumSet < HeaderNames . HeadersToAdd > whichHeaders = EnumSet . allOf ( HeaderNames . HeadersToAdd . class );
92
92
93
93
private boolean retainExceptionHeader ;
94
94
@@ -185,7 +185,6 @@ public DeadLetterPublishingRecoverer(Map<Class<?>, KafkaOperations<? extends Obj
185
185
.map (t -> t .isTransactional ())
186
186
.allMatch (t -> t .equals (tx )), "All templates must have the same setting for transactional" );
187
187
this .destinationResolver = destinationResolver ;
188
- setHeaderBits (this .whichHeaders );
189
188
}
190
189
191
190
/**
@@ -210,12 +209,6 @@ public DeadLetterPublishingRecoverer(Function<ProducerRecord<?, ?>, KafkaOperati
210
209
this .transactional = transactional ;
211
210
this .destinationResolver = destinationResolver ;
212
211
this .templateResolver = templateResolver ;
213
- setHeaderBits (this .whichHeaders );
214
- }
215
-
216
- private static void setHeaderBits (BitSet bits ) {
217
- bits .set (HeaderNames .HeadersToAdd .OFFSET .ordinal (),
218
- (HeaderNames .HeadersToAdd .EX_STACKTRACE .ordinal ()) + 1 );
219
212
}
220
213
221
214
/**
@@ -368,7 +361,7 @@ public void excludeHeader(HeaderNames.HeadersToAdd... headers) {
368
361
Assert .notNull (headers , "'headers' cannot be null" );
369
362
Assert .noNullElements (headers , "'headers' cannot include null elements" );
370
363
for (HeaderNames .HeadersToAdd header : headers ) {
371
- this .whichHeaders .clear (header . ordinal () );
364
+ this .whichHeaders .remove (header );
372
365
}
373
366
}
374
367
@@ -381,7 +374,7 @@ public void includeHeader(HeaderNames.HeadersToAdd... headers) {
381
374
Assert .notNull (headers , "'headers' cannot be null" );
382
375
Assert .noNullElements (headers , "'headers' cannot include null elements" );
383
376
for (HeaderNames .HeadersToAdd header : headers ) {
384
- this .whichHeaders .set (header . ordinal () );
377
+ this .whichHeaders .add (header );
385
378
}
386
379
}
387
380
@@ -681,7 +674,7 @@ private void maybeAddOriginalHeaders(Headers kafkaHeaders, ConsumerRecord<?, ?>
681
674
}
682
675
683
676
private void maybeAddHeader (Headers kafkaHeaders , String header , byte [] value , HeadersToAdd hta ) {
684
- if (this .whichHeaders .get (hta . ordinal () )
677
+ if (this .whichHeaders .contains (hta )
685
678
&& (this .appendOriginalHeaders || kafkaHeaders .lastHeader (header ) == null )) {
686
679
kafkaHeaders .add (header , value );
687
680
}
@@ -713,7 +706,7 @@ private void addExceptionInfoHeaders(Headers kafkaHeaders, Exception exception,
713
706
}
714
707
715
708
private void appendOrReplace (Headers headers , RecordHeader header , HeadersToAdd hta ) {
716
- if (this .whichHeaders .get (hta . ordinal () )) {
709
+ if (this .whichHeaders .contains (hta )) {
717
710
if (this .stripPreviousExceptionHeaders ) {
718
711
headers .remove (header .key ());
719
712
}
0 commit comments