Skip to content

Commit 5836a6a

Browse files
committed
fix(stream): release FetchResults if the subsequent fetch fails
Signed-off-by: Ning Yu <[email protected]>
1 parent 1e217ee commit 5836a6a

File tree

1 file changed

+5
-0
lines changed

1 file changed

+5
-0
lines changed

Diff for: core/src/main/scala/kafka/log/streamaspect/ElasticLogFileRecords.java

+5
Original file line numberDiff line numberDiff line change
@@ -159,6 +159,7 @@ private CompletableFuture<LinkedList<FetchResult>> fetch0(FetchContext context,
159159
} else {
160160
LOGGER.error("Invalid record batch, last offset {} is less than next offset {}",
161161
recordBatchWithContext.lastOffset(), nextFetchOffset);
162+
rst.free();
162163
throw new IllegalStateException();
163164
}
164165
readSize += recordBatchWithContext.rawPayload().remaining();
@@ -168,6 +169,10 @@ private CompletableFuture<LinkedList<FetchResult>> fetch0(FetchContext context,
168169
// add to first since we need to reverse the order.
169170
rstList.addFirst(rst);
170171
return rstList;
172+
}).whenComplete((r, e) -> {
173+
if (e != null) {
174+
rst.free();
175+
}
171176
});
172177
});
173178
}

0 commit comments

Comments
 (0)