Skip to content

Commit f7e8b9a

Browse files
authored
fix(stream): release FetchResults if the subsequent fetch fails (#2174)
fix(stream): release `FetchResult`s if the subsequent fetch fails (#2172) * fix(stream): release `FetchResult`s if the subsequent fetch fails * revert: "fix(stream): release `FetchResult`s if the subsequent fetch fails" This reverts commit 5836a6a. * refactor: add the `FetchResult` into the list in order rather than in reverse order * fix: release `FetchResult`s if failed to fetch --------- Signed-off-by: Ning Yu <[email protected]>
1 parent bf8ebdb commit f7e8b9a

File tree

1 file changed

+23
-10
lines changed

1 file changed

+23
-10
lines changed

Diff for: core/src/main/scala/kafka/log/streamaspect/ElasticLogFileRecords.java

+23-10
Original file line numberDiff line numberDiff line change
@@ -140,17 +140,35 @@ private CompletableFuture<Records> readAll0(FetchContext context, long startOffs
140140
if (nextFetchOffset >= endOffset) {
141141
return CompletableFuture.completedFuture(MemoryRecords.EMPTY);
142142
}
143-
return fetch0(context, nextFetchOffset, endOffset, maxSize)
144-
.thenApply(rst -> PooledMemoryRecords.of(baseOffset, rst, context.readOptions().pooledBuf()));
143+
List<FetchResult> results = new LinkedList<>();
144+
return fetch0(context, nextFetchOffset, endOffset, maxSize, results)
145+
.whenComplete((nil, e) -> {
146+
if (e != null) {
147+
results.forEach(FetchResult::free);
148+
results.clear();
149+
}
150+
})
151+
.thenApply(nil -> PooledMemoryRecords.of(baseOffset, results, context.readOptions().pooledBuf()));
145152
}
146153

147-
private CompletableFuture<LinkedList<FetchResult>> fetch0(FetchContext context, long startOffset, long endOffset, int maxSize) {
154+
/**
155+
* Fetch records from the {@link ElasticStreamSlice}
156+
*
157+
* @param context fetch context
158+
* @param startOffset start offset
159+
* @param endOffset end offset
160+
* @param maxSize max size of the fetched records
161+
* @param results result list to be filled
162+
* @return a future that completes when reaching the end offset or the max size
163+
*/
164+
private CompletableFuture<Void> fetch0(FetchContext context, long startOffset, long endOffset, int maxSize, List<FetchResult> results) {
148165
if (startOffset >= endOffset || maxSize <= 0) {
149-
return CompletableFuture.completedFuture(new LinkedList<>());
166+
return CompletableFuture.completedFuture(null);
150167
}
151168
int adjustedMaxSize = Math.min(maxSize, 1024 * 1024);
152169
return streamSlice.fetch(context, startOffset, endOffset, adjustedMaxSize)
153170
.thenCompose(rst -> {
171+
results.add(rst);
154172
long nextFetchOffset = startOffset;
155173
int readSize = 0;
156174
for (RecordBatchWithContext recordBatchWithContext : rst.recordBatchList()) {
@@ -163,12 +181,7 @@ private CompletableFuture<LinkedList<FetchResult>> fetch0(FetchContext context,
163181
}
164182
readSize += recordBatchWithContext.rawPayload().remaining();
165183
}
166-
return fetch0(context, nextFetchOffset, endOffset, maxSize - readSize)
167-
.thenApply(rstList -> {
168-
// add to first since we need to reverse the order.
169-
rstList.addFirst(rst);
170-
return rstList;
171-
});
184+
return fetch0(context, nextFetchOffset, endOffset, maxSize - readSize, results);
172185
});
173186
}
174187

0 commit comments

Comments
 (0)