Skip to content

Commit 83d9a31

Browse files
authored
KafkaMessageListenerContainer Code Cleanup
Fixes: #3079 * Add method consumerWakeIfNecessary() to reuse consumer.wakeIfNecessary(). * Add method getTxProducer(). * Optimize determineListenerType(). * Change OffsetMetadata type to record.
1 parent cd4341c commit 83d9a31

File tree

1 file changed

+30
-47
lines changed

1 file changed

+30
-47
lines changed

Diff for: spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java

+30-47
Original file line numberDiff line numberDiff line change
@@ -305,8 +305,7 @@ public boolean isContainerPaused() {
305305

306306
@Override
307307
public boolean isPartitionPaused(TopicPartition topicPartition) {
308-
return this.listenerConsumer != null && this.listenerConsumer
309-
.isPartitionPaused(topicPartition);
308+
return this.listenerConsumer != null && this.listenerConsumer.isPartitionPaused(topicPartition);
310309
}
311310

312311
@Override
@@ -317,33 +316,28 @@ public boolean isInExpectedState() {
317316
@Override
318317
public void enforceRebalance() {
319318
this.thisOrParentContainer.enforceRebalanceRequested.set(true);
320-
KafkaMessageListenerContainer<K, V>.ListenerConsumer consumer = this.listenerConsumer;
321-
if (consumer != null) {
322-
consumer.wakeIfNecessary();
323-
}
319+
consumerWakeIfNecessary();
324320
}
325321

326322
@Override
327323
public void pause() {
328324
super.pause();
329-
KafkaMessageListenerContainer<K, V>.ListenerConsumer consumer = this.listenerConsumer;
330-
if (consumer != null) {
331-
consumer.wakeIfNecessary();
332-
}
325+
consumerWakeIfNecessary();
333326
}
334327

335328
@Override
336329
public void resume() {
337330
super.resume();
338-
KafkaMessageListenerContainer<K, V>.ListenerConsumer consumer = this.listenerConsumer;
339-
if (consumer != null) {
340-
consumer.wakeIfNecessary();
341-
}
331+
consumerWakeIfNecessary();
342332
}
343333

344334
@Override
345335
public void resumePartition(TopicPartition topicPartition) {
346336
super.resumePartition(topicPartition);
337+
consumerWakeIfNecessary();
338+
}
339+
340+
private void consumerWakeIfNecessary() {
347341
KafkaMessageListenerContainer<K, V>.ListenerConsumer consumer = this.listenerConsumer;
348342
if (consumer != null) {
349343
consumer.wakeIfNecessary();
@@ -422,15 +416,11 @@ private void checkAckMode(ContainerProperties containerProperties) {
422416
}
423417

424418
private ListenerType determineListenerType(GenericMessageListener<?> listener) {
425-
ListenerType listenerType = ListenerUtils.determineListenerType(listener);
426-
if (listener instanceof DelegatingMessageListener) {
427-
Object delegating = listener;
428-
while (delegating instanceof DelegatingMessageListener<?> dml) {
429-
delegating = dml.getDelegate();
430-
}
431-
listenerType = ListenerUtils.determineListenerType(delegating);
419+
Object delegating = listener;
420+
while (delegating instanceof DelegatingMessageListener<?> dml) {
421+
delegating = dml.getDelegate();
432422
}
433-
return listenerType;
423+
return ListenerUtils.determineListenerType(delegating);
434424
}
435425

436426
@Override
@@ -1586,7 +1576,7 @@ private void fixTxOffsetsIfNeeded() {
15861576
this.lastCommits.forEach((tp, oamd) -> {
15871577
long position = this.consumer.position(tp);
15881578
Long saved = this.savedPositions.get(tp);
1589-
if (saved != null && saved.longValue() != position) {
1579+
if (saved != null && saved != position) {
15901580
this.logger.debug(() -> "Skipping TX offset correction - seek(s) have been performed; "
15911581
+ "saved: " + this.savedPositions + ", "
15921582
+ "committed: " + oamd + ", "
@@ -1609,9 +1599,7 @@ private void fixTxOffsetsIfNeeded() {
16091599
}
16101600
else {
16111601
this.transactionTemplate.executeWithoutResult(status -> {
1612-
doSendOffsets(((KafkaResourceHolder) TransactionSynchronizationManager
1613-
.getResource(this.kafkaTxManager.getProducerFactory()))
1614-
.getProducer(), toFix);
1602+
doSendOffsets(getTxProducer(), toFix);
16151603
});
16161604
}
16171605
}
@@ -2195,9 +2183,7 @@ private void invokeBatchListenerInTx(final ConsumerRecords<K, V> records,
21952183
@Override
21962184
public void doInTransactionWithoutResult(TransactionStatus s) {
21972185
if (ListenerConsumer.this.kafkaTxManager != null) {
2198-
ListenerConsumer.this.producer = ((KafkaResourceHolder) TransactionSynchronizationManager
2199-
.getResource(ListenerConsumer.this.kafkaTxManager.getProducerFactory()))
2200-
.getProducer(); // NOSONAR nullable
2186+
ListenerConsumer.this.producer = getTxProducer();
22012187
}
22022188
RuntimeException aborted = doInvokeBatchListener(records, recordList);
22032189
if (aborted != null) {
@@ -2516,7 +2502,6 @@ private void invokeRecordListener(final ConsumerRecords<K, V> records) {
25162502
* Invoke the listener with each record in a separate transaction.
25172503
* @param records the records.
25182504
*/
2519-
@SuppressWarnings(RAWTYPES) // NOSONAR complexity
25202505
private void invokeRecordListenerInTx(final ConsumerRecords<K, V> records) {
25212506
Iterator<ConsumerRecord<K, V>> iterator = records.iterator();
25222507
while (iterator.hasNext()) {
@@ -2561,9 +2546,7 @@ private void invokeInTransaction(Iterator<ConsumerRecord<K, V>> iterator, final
25612546
@Override
25622547
public void doInTransactionWithoutResult(TransactionStatus s) {
25632548
if (ListenerConsumer.this.kafkaTxManager != null) {
2564-
ListenerConsumer.this.producer = ((KafkaResourceHolder) TransactionSynchronizationManager
2565-
.getResource(ListenerConsumer.this.kafkaTxManager.getProducerFactory()))
2566-
.getProducer(); // NOSONAR
2549+
ListenerConsumer.this.producer = getTxProducer();
25672550
}
25682551
RuntimeException aborted = doInvokeRecordListener(cRecord, iterator);
25692552
if (aborted != null) {
@@ -2755,6 +2738,13 @@ private void pauseForNackSleep() {
27552738
this.nackSleepDurationMillis = -1;
27562739
}
27572740

2741+
@SuppressWarnings(RAWTYPES)
2742+
private Producer<?, ?> getTxProducer() {
2743+
return ((KafkaResourceHolder) TransactionSynchronizationManager
2744+
.getResource(ListenerConsumer.this.kafkaTxManager.getProducerFactory()))
2745+
.getProducer(); // NOSONAR
2746+
}
2747+
27582748
/**
27592749
* Actually invoke the listener.
27602750
* @param cRecord the record.
@@ -3884,20 +3874,13 @@ private Long computeBackwardWhereTo(long offset, boolean toCurrent, TopicPartiti
38843874
}
38853875

38863876

3887-
private static final class OffsetMetadata {
3888-
3889-
final Long offset; // NOSONAR
3890-
3891-
final boolean relativeToCurrent; // NOSONAR
3892-
3893-
final SeekPosition seekPosition; // NOSONAR
3894-
3895-
OffsetMetadata(Long offset, boolean relativeToCurrent, SeekPosition seekPosition) {
3896-
this.offset = offset;
3897-
this.relativeToCurrent = relativeToCurrent;
3898-
this.seekPosition = seekPosition;
3899-
}
3900-
3877+
/**
3878+
* Offset metadata record.
3879+
* @param offset current offset.
3880+
* @param relativeToCurrent relative to current.
3881+
* @param seekPosition seek position strategy.
3882+
*/
3883+
private record OffsetMetadata(Long offset, boolean relativeToCurrent, SeekPosition seekPosition) {
39013884
}
39023885

39033886
private class StopCallback implements BiConsumer<Object, Throwable> {

0 commit comments

Comments
 (0)