@@ -152,6 +152,10 @@ public class KafkaMessageListenerContainer<K, V> // NOSONAR line count
152
152
153
153
private static final String UNUSED = "unused" ;
154
154
155
+ private static final String UNCHECKED = "unchecked" ;
156
+
157
+ private static final String RAWTYPES = "rawtypes" ;
158
+
155
159
private static final int DEFAULT_ACK_TIME = 5000 ;
156
160
157
161
private static final Map <String , Object > CONSUMER_CONFIG_DEFAULTS = ConsumerConfig .configDef ().defaultValues ();
@@ -547,12 +551,6 @@ private final class ListenerConsumer implements SchedulingAwareRunnable, Consume
547
551
548
552
private static final String ERROR_HANDLER_THREW_AN_EXCEPTION = "Error handler threw an exception" ;
549
553
550
- private static final String UNCHECKED = "unchecked" ;
551
-
552
- private static final String RAWTYPES = "rawtypes" ;
553
-
554
- private static final String RAW_TYPES = RAWTYPES ;
555
-
556
554
private final LogAccessor logger = KafkaMessageListenerContainer .this .logger ; // NOSONAR hide
557
555
558
556
private final ContainerProperties containerProperties = getContainerProperties ();
@@ -611,7 +609,7 @@ private final class ListenerConsumer implements SchedulingAwareRunnable, Consume
611
609
612
610
private final PlatformTransactionManager transactionManager = this .containerProperties .getTransactionManager ();
613
611
614
- @ SuppressWarnings (RAW_TYPES )
612
+ @ SuppressWarnings (RAWTYPES )
615
613
private final KafkaAwareTransactionManager kafkaTxManager =
616
614
this .transactionManager instanceof KafkaAwareTransactionManager
617
615
? ((KafkaAwareTransactionManager ) this .transactionManager ) : null ;
@@ -708,10 +706,10 @@ private final class ListenerConsumer implements SchedulingAwareRunnable, Consume
708
706
709
707
private final Map <TopicPartition , Boolean > wasIdlePartition ;
710
708
711
- private final byte [] listenerinfo = getListenerInfo ();
712
-
713
709
private final Header infoHeader = new RecordHeader (KafkaHeaders .LISTENER_INFO , this .listenerinfo );
714
710
711
+ private final byte [] listenerinfo = getListenerInfo ();
712
+
715
713
private final Set <TopicPartition > pausedForNack = new HashSet <>();
716
714
717
715
private Map <TopicPartition , OffsetMetadata > definedPartitions ;
@@ -1666,16 +1664,20 @@ private void pausePartitionsIfNecessary() {
1666
1664
}
1667
1665
1668
1666
private void resumePartitionsIfNecessary () {
1669
- List <TopicPartition > partitionsToResume = getAssignedPartitions ()
1670
- .stream ()
1671
- .filter (tp -> !isPartitionPauseRequested (tp )
1672
- && this .pausedPartitions .contains (tp ))
1673
- .collect (Collectors .toList ());
1674
- if (partitionsToResume .size () > 0 ) {
1675
- this .consumer .resume (partitionsToResume );
1676
- this .pausedPartitions .removeAll (partitionsToResume );
1677
- this .logger .debug (() -> "Resumed consumption from " + partitionsToResume );
1678
- partitionsToResume .forEach (KafkaMessageListenerContainer .this ::publishConsumerPartitionResumedEvent );
1667
+ Collection <TopicPartition > assigned = getAssignedPartitions ();
1668
+ if (assigned != null ) {
1669
+ List <TopicPartition > partitionsToResume = assigned
1670
+ .stream ()
1671
+ .filter (tp -> !isPartitionPauseRequested (tp )
1672
+ && this .pausedPartitions .contains (tp ))
1673
+ .collect (Collectors .toList ());
1674
+ if (partitionsToResume .size () > 0 ) {
1675
+ this .consumer .resume (partitionsToResume );
1676
+ this .pausedPartitions .removeAll (partitionsToResume );
1677
+ this .logger .debug (() -> "Resumed consumption from " + partitionsToResume );
1678
+ partitionsToResume
1679
+ .forEach (KafkaMessageListenerContainer .this ::publishConsumerPartitionResumedEvent );
1680
+ }
1679
1681
}
1680
1682
}
1681
1683
@@ -1988,7 +1990,7 @@ private void invokeBatchListener(final ConsumerRecords<K, V> recordsArg) {
1988
1990
}
1989
1991
}
1990
1992
1991
- @ SuppressWarnings (RAW_TYPES )
1993
+ @ SuppressWarnings (RAWTYPES )
1992
1994
private void invokeBatchListenerInTx (final ConsumerRecords <K , V > records ,
1993
1995
@ Nullable final List <ConsumerRecord <K , V >> recordList ) {
1994
1996
@@ -2299,7 +2301,7 @@ private void invokeRecordListener(final ConsumerRecords<K, V> records) {
2299
2301
* Invoke the listener with each record in a separate transaction.
2300
2302
* @param records the records.
2301
2303
*/
2302
- @ SuppressWarnings (RAW_TYPES ) // NOSONAR complexity
2304
+ @ SuppressWarnings (RAWTYPES ) // NOSONAR complexity
2303
2305
private void invokeRecordListenerInTx (final ConsumerRecords <K , V > records ) {
2304
2306
Iterator <ConsumerRecord <K , V >> iterator = records .iterator ();
2305
2307
while (iterator .hasNext ()) {
@@ -2485,7 +2487,10 @@ private void pauseForNackSleep() {
2485
2487
this .nackWake = System .currentTimeMillis () + this .nackSleep ;
2486
2488
this .nackSleep = -1 ;
2487
2489
Set <TopicPartition > alreadyPaused = this .consumer .paused ();
2488
- this .pausedForNack .addAll (getAssignedPartitions ());
2490
+ Collection <TopicPartition > assigned = getAssignedPartitions ();
2491
+ if (assigned != null ) {
2492
+ this .pausedForNack .addAll (assigned );
2493
+ }
2489
2494
this .pausedForNack .removeAll (alreadyPaused );
2490
2495
this .logger .debug (() -> "Pausing for nack sleep: " + ListenerConsumer .this .pausedForNack );
2491
2496
try {
0 commit comments