Skip to content

Commit de82200

Browse files
committed
Reuse FrozenIndexInput.writeCacheFile method (elastic#70545)
Makes it easier for subsequent PRs that change the core write logic of FrozenIndexInput as there is just a single write path
1 parent fa544bd commit de82200

File tree

1 file changed

+35
-49
lines changed
  • x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/index/store/cache

1 file changed

+35
-49
lines changed

x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/index/store/cache/FrozenIndexInput.java

Lines changed: 35 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -229,30 +229,14 @@ protected void doReadInternal(ByteBuffer b) throws IOException {
229229
(channel, channelPos, relativePos, len, progressUpdater) -> {
230230
assert len <= cachedBlob.to() - cachedBlob.from();
231231
final long startTimeNanos = stats.currentTimeNanos();
232-
final BytesRefIterator iterator = cachedBlob.bytes()
233-
.slice(toIntBytes(relativePos), toIntBytes(len))
234-
.iterator();
235-
long writePosition = channelPos;
236-
long bytesCopied = 0L;
237-
BytesRef current;
238-
while ((current = iterator.next()) != null) {
239-
final ByteBuffer byteBuffer = ByteBuffer.wrap(current.bytes, current.offset, current.length);
240-
while (byteBuffer.remaining() > 0) {
241-
final long bytesWritten = positionalWrite(channel, writePosition, byteBuffer);
242-
bytesCopied += bytesWritten;
243-
writePosition += bytesWritten;
244-
progressUpdater.accept(bytesCopied);
245-
}
246-
}
247-
long channelTo = channelPos + len;
248-
assert writePosition == channelTo : writePosition + " vs " + channelTo;
249-
final long endTimeNanos = stats.currentTimeNanos();
250-
stats.addCachedBytesWritten(len, endTimeNanos - startTimeNanos);
251-
logger.trace(
252-
"copied bytes [{}-{}] of file [{}] from cache index to disk",
232+
writeCacheFile(
233+
channel,
234+
cachedBlob.bytes().streamInput(),
235+
channelPos,
253236
relativePos,
254-
relativePos + len,
255-
fileName
237+
len,
238+
progressUpdater,
239+
startTimeNanos
256240
);
257241
},
258242
directory.cacheFetchAsyncExecutor()
@@ -309,14 +293,14 @@ protected void doReadInternal(ByteBuffer b) throws IOException {
309293
luceneByteBufLock,
310294
stopAsyncReads
311295
),
312-
(channel, channelPos, relativePos, len, progressUpdater) -> this.writeCacheFile(
313-
channel,
314-
channelPos,
315-
relativePos,
316-
len,
317-
rangeToWrite.start(),
318-
progressUpdater
319-
),
296+
(channel, channelPos, relativePos, len, progressUpdater) -> {
297+
final long startTimeNanos = stats.currentTimeNanos();
298+
final long streamStartPosition = rangeToWrite.start() + relativePos + compoundFileOffset;
299+
300+
try (InputStream input = openInputStreamFromBlobStore(streamStartPosition, len)) {
301+
this.writeCacheFile(channel, input, channelPos, relativePos, len, progressUpdater, startTimeNanos);
302+
}
303+
},
320304
directory.cacheFetchAsyncExecutor()
321305
);
322306

@@ -589,17 +573,17 @@ private int readCacheFile(
589573

590574
private void writeCacheFile(
591575
final SharedBytes.IO fc,
592-
long fileChannelPos,
593-
long relativePos,
594-
long length,
595-
long logicalPos,
596-
final Consumer<Long> progressUpdater
576+
final InputStream input,
577+
final long fileChannelPos,
578+
final long relativePos,
579+
final long length,
580+
final Consumer<Long> progressUpdater,
581+
final long startTimeNanos
597582
) throws IOException {
598583
assert assertCurrentThreadMayWriteCacheFile();
599584
logger.trace(
600-
"{}: writing logical {} channel {} pos {} length {} (details: {})",
585+
"{}: writing channel {} pos {} length {} (details: {})",
601586
fileInfo.physicalName(),
602-
logicalPos,
603587
fileChannelPos,
604588
relativePos,
605589
length,
@@ -611,19 +595,21 @@ private void writeCacheFile(
611595

612596
long bytesCopied = 0L;
613597
long remaining = length;
614-
final long startTimeNanos = stats.currentTimeNanos();
615-
try (InputStream input = openInputStreamFromBlobStore(logicalPos + relativePos + compoundFileOffset, length)) {
616-
while (remaining > 0L) {
617-
final int bytesRead = readSafe(input, copyBuffer, relativePos, end, remaining, frozenCacheFile);
618-
positionalWrite(fc, fileChannelPos + bytesCopied, ByteBuffer.wrap(copyBuffer, 0, bytesRead));
619-
bytesCopied += bytesRead;
620-
remaining -= bytesRead;
621-
progressUpdater.accept(bytesCopied);
598+
while (remaining > 0L) {
599+
final int bytesRead = readSafe(input, copyBuffer, relativePos, end, remaining, frozenCacheFile);
600+
final ByteBuffer byteBuffer = ByteBuffer.wrap(copyBuffer, 0, bytesRead);
601+
int writePosition = 0;
602+
while (byteBuffer.remaining() > 0) {
603+
final long bytesWritten = positionalWrite(fc, fileChannelPos + bytesCopied + writePosition, byteBuffer);
604+
writePosition += bytesWritten;
622605
}
623-
final long endTimeNanos = stats.currentTimeNanos();
624-
assert bytesCopied == length;
625-
stats.addCachedBytesWritten(bytesCopied, endTimeNanos - startTimeNanos);
606+
bytesCopied += bytesRead;
607+
remaining -= bytesRead;
608+
progressUpdater.accept(bytesCopied);
626609
}
610+
final long endTimeNanos = stats.currentTimeNanos();
611+
assert bytesCopied == length;
612+
stats.addCachedBytesWritten(bytesCopied, endTimeNanos - startTimeNanos);
627613
}
628614

629615
@Override

0 commit comments

Comments
 (0)