Skip to content

Commit c155336

Browse files
GH-3638: Fixes bug caused by race condition during handleAsyncFailure()
Fixes: #3638 Issue: #3638
1 parent efbd396 commit c155336

File tree

1 file changed

+8
-1
lines changed

1 file changed

+8
-1
lines changed

spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java

+8-1
Original file line numberDiff line numberDiff line change
@@ -1466,7 +1466,14 @@ protected void pollAndInvoke() {
14661466

14671467
protected void handleAsyncFailure() {
14681468
List<FailedRecordTuple<K, V>> copyFailedRecords = new ArrayList<>(this.failedRecords);
1469-
this.failedRecords.clear();
1469+
1470+
// If we use failedRecords.clear() to remove copied record from failed records,
1471+
// We may encounter race condition during this operation.
1472+
// Other, the thread which execute this block, may miss one failed record.
1473+
int capturedRecordsCount = copyFailedRecords.size();
1474+
for (int i = 0; i < capturedRecordsCount; i++) {
1475+
this.failedRecords.pollFirst();
1476+
}
14701477

14711478
// If any copied and failed record fails to complete due to an unexpected error,
14721479
// We will give up on retrying with the remaining copied and failed Records.

0 commit comments

Comments
 (0)