Skip to content

Commit 2112c8d

Browse files
rhryngaryrussell
rhryn
authored andcommitted
Log Uncommitted After Rebalance
When commits a retryable, some partitions may have been assigned to another instance, in which case, those offsets can't be committed. Log the offsets that could not be committed at WARN level. Changed log level to improve troubleshooting Changed log level to improve troubleshooting Changed log level to improve troubleshooting Fixed formatting
1 parent 0c65c9b commit 2112c8d

File tree

1 file changed

+9
-1
lines changed

1 file changed

+9
-1
lines changed

Diff for: spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java

+9-1
Original file line numberDiff line numberDiff line change
@@ -1550,7 +1550,15 @@ private void checkRebalanceCommits() {
15501550
Map<TopicPartition, OffsetAndMetadata> commits = this.commitsDuringRebalance.entrySet()
15511551
.stream()
15521552
.filter(entry -> this.assignedPartitions.contains(entry.getKey()))
1553-
.collect(Collectors.toMap(entry -> entry.getKey(), entry -> entry.getValue()));
1553+
.collect(Collectors.toMap(Entry::getKey, Entry::getValue));
1554+
1555+
Map<TopicPartition, OffsetAndMetadata> uncommitted = this.commitsDuringRebalance.entrySet()
1556+
.stream()
1557+
.filter(entry -> !this.assignedPartitions.contains(entry.getKey()))
1558+
.collect(Collectors.toMap(Entry::getKey, Entry::getValue));
1559+
this.logger.warn(() -> "These offsets could not be committed; partition(s) lost during rebalance: "
1560+
+ uncommitted);
1561+
15541562
this.commitsDuringRebalance.clear();
15551563
this.logger.debug(() -> "Commit list: " + commits);
15561564
commitSync(commits);

0 commit comments

Comments
 (0)