Skip to content

Commit 6c08574

Browse files
Manually Manage Direct Write Buffer in Frozen Cache Writes (#70399)
Manually managing the buffer has a number of upsides for us and no direct downsides as far as I can see. * We limit the number of syscalls for large writes by a factor of up to `8` (in practice it's probably mostly `8` when writing larger amounts of data) * Since we limit the write thread count to 28 the amount of direct memory used is limited * We eliminate the copying from heap -> off-heap under the lock in the channel's positional write * We eliminate allocating `ByteBuffer` instances in a hot loop when writing large regions
1 parent ee0f116 commit 6c08574

File tree

1 file changed

+22
-8
lines changed
  • x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/index/store/cache

1 file changed

+22
-8
lines changed

x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/index/store/cache/FrozenIndexInput.java

Lines changed: 22 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -467,7 +467,11 @@ protected ByteRange maybeReadFromBlobCache(long position, int length) {
467467

468468
private static int positionalWrite(SharedBytes.IO fc, long start, ByteBuffer byteBuffer) throws IOException {
469469
assert assertCurrentThreadMayWriteCacheFile();
470-
return fc.write(byteBuffer, start);
470+
byteBuffer.flip();
471+
int written = fc.write(byteBuffer, start);
472+
assert byteBuffer.hasRemaining() == false;
473+
byteBuffer.clear();
474+
return written;
471475
}
472476

473477
/**
@@ -571,6 +575,13 @@ private int readCacheFile(
571575
return bytesRead;
572576
}
573577

578+
/**
579+
* Thread local direct byte buffer to aggregate multiple positional writes to the cache file.
580+
*/
581+
private static final ThreadLocal<ByteBuffer> writeBuffer = ThreadLocal.withInitial(
582+
() -> ByteBuffer.allocateDirect(COPY_BUFFER_SIZE * 8)
583+
);
584+
574585
private void writeCacheFile(
575586
final SharedBytes.IO fc,
576587
final InputStream input,
@@ -595,18 +606,21 @@ private void writeCacheFile(
595606

596607
long bytesCopied = 0L;
597608
long remaining = length;
609+
final ByteBuffer buf = writeBuffer.get();
610+
buf.clear();
598611
while (remaining > 0L) {
599612
final int bytesRead = readSafe(input, copyBuffer, relativePos, end, remaining, frozenCacheFile);
600-
final ByteBuffer byteBuffer = ByteBuffer.wrap(copyBuffer, 0, bytesRead);
601-
int writePosition = 0;
602-
while (byteBuffer.remaining() > 0) {
603-
final long bytesWritten = positionalWrite(fc, fileChannelPos + bytesCopied + writePosition, byteBuffer);
604-
writePosition += bytesWritten;
613+
if (bytesRead > buf.remaining()) {
614+
long bytesWritten = positionalWrite(fc, fileChannelPos + bytesCopied, buf);
615+
bytesCopied += bytesWritten;
616+
progressUpdater.accept(bytesCopied);
605617
}
606-
bytesCopied += bytesRead;
618+
buf.put(copyBuffer, 0, bytesRead);
607619
remaining -= bytesRead;
608-
progressUpdater.accept(bytesCopied);
609620
}
621+
long bytesWritten = positionalWrite(fc, fileChannelPos + bytesCopied, buf);
622+
bytesCopied += bytesWritten;
623+
progressUpdater.accept(bytesCopied);
610624
final long endTimeNanos = stats.currentTimeNanos();
611625
assert bytesCopied == length;
612626
stats.addCachedBytesWritten(bytesCopied, endTimeNanos - startTimeNanos);

0 commit comments

Comments
 (0)