Skip to content

Commit e20f16e

Browse files
authored
Better skip bytes in FsBlobContainer (#54333)
This committ is addressing a // NORELEASE tag in FsBlobContainer where the InputStream opened by the readBlob(String) does not efficiently skip bytes under hood. This method uses the Files.newInputStream(resolvedPath) which uses the Files.newByteChannel() which returns a SeekableByteChannel instance that allows to efficiently skip bytes but sadly this channel is then wrapped using Channels.newInputStream() into a ChannelInputStream instance that delegates the skip() method execution to the default InputStream implementation which reads bytes. This commit changes the implementation of the readBlob(String, long, long) method to use a more effective manner to skip bytes and adds a test to verify that only the required number of bytes are read. It also swaps how the buffered and limit streams are combined so that the underlying stream is limited first and the buffered stream will not try to buffer more bytes than necessary.
1 parent e669f14 commit e20f16e

File tree

2 files changed

+121
-5
lines changed

2 files changed

+121
-5
lines changed

server/src/main/java/org/elasticsearch/common/blobstore/fs/FsBlobContainer.java

Lines changed: 13 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,8 @@
3434
import java.io.IOException;
3535
import java.io.InputStream;
3636
import java.io.OutputStream;
37+
import java.nio.channels.Channels;
38+
import java.nio.channels.SeekableByteChannel;
3739
import java.nio.file.DirectoryStream;
3840
import java.nio.file.FileAlreadyExistsException;
3941
import java.nio.file.FileVisitResult;
@@ -142,22 +144,28 @@ public void deleteBlobsIgnoringIfNotExists(List<String> blobNames) throws IOExce
142144
IOUtils.rm(blobNames.stream().map(path::resolve).toArray(Path[]::new));
143145
}
144146

147+
private InputStream bufferedInputStream(InputStream inputStream) {
148+
return new BufferedInputStream(inputStream, blobStore.bufferSizeInBytes());
149+
}
150+
145151
@Override
146152
public InputStream readBlob(String name) throws IOException {
147153
final Path resolvedPath = path.resolve(name);
148154
try {
149-
return new BufferedInputStream(Files.newInputStream(resolvedPath), blobStore.bufferSizeInBytes());
155+
return bufferedInputStream(Files.newInputStream(resolvedPath));
150156
} catch (FileNotFoundException fnfe) {
151157
throw new NoSuchFileException("[" + name + "] blob not found");
152158
}
153159
}
154160

155161
@Override
156162
public InputStream readBlob(String blobName, long position, long length) throws IOException {
157-
final InputStream inputStream = readBlob(blobName);
158-
long skipped = inputStream.skip(position); // NORELEASE
159-
assert skipped == position;
160-
return org.elasticsearch.common.io.Streams.limitStream(inputStream, length);
163+
final SeekableByteChannel channel = Files.newByteChannel(path.resolve(blobName));
164+
if (position > 0L) {
165+
channel.position(position);
166+
}
167+
assert channel.position() == position;
168+
return bufferedInputStream(org.elasticsearch.common.io.Streams.limitStream(Channels.newInputStream(channel), length));
161169
}
162170

163171
@Override

server/src/test/java/org/elasticsearch/common/blobstore/fs/FsBlobContainerTests.java

Lines changed: 108 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,14 +18,78 @@
1818
*/
1919
package org.elasticsearch.common.blobstore.fs;
2020

21+
import org.apache.lucene.mockfile.FilterFileSystemProvider;
22+
import org.apache.lucene.mockfile.FilterSeekableByteChannel;
23+
import org.apache.lucene.util.LuceneTestCase;
24+
import org.elasticsearch.common.blobstore.BlobPath;
25+
import org.elasticsearch.common.io.PathUtils;
26+
import org.elasticsearch.common.io.PathUtilsForTesting;
27+
import org.elasticsearch.common.io.Streams;
28+
import org.elasticsearch.common.settings.Settings;
29+
import org.elasticsearch.core.internal.io.IOUtils;
2130
import org.elasticsearch.test.ESTestCase;
31+
import org.junit.After;
32+
import org.junit.Before;
33+
34+
import java.io.FilterInputStream;
35+
import java.io.IOException;
36+
import java.io.InputStream;
37+
import java.nio.ByteBuffer;
38+
import java.nio.channels.SeekableByteChannel;
39+
import java.nio.file.FileSystem;
40+
import java.nio.file.Files;
41+
import java.nio.file.OpenOption;
42+
import java.nio.file.Path;
43+
import java.nio.file.attribute.FileAttribute;
44+
import java.nio.file.spi.FileSystemProvider;
45+
import java.util.Locale;
46+
import java.util.Set;
47+
import java.util.concurrent.atomic.AtomicLong;
48+
import java.util.function.Consumer;
2249

2350
import static org.hamcrest.Matchers.containsString;
51+
import static org.hamcrest.Matchers.equalTo;
2452
import static org.hamcrest.Matchers.is;
2553
import static org.hamcrest.Matchers.startsWith;
2654

55+
@LuceneTestCase.SuppressFileSystems("*") // we do our own mocking
2756
public class FsBlobContainerTests extends ESTestCase {
2857

58+
final AtomicLong totalBytesRead = new AtomicLong(0);
59+
FileSystem fileSystem = null;
60+
61+
@Before
62+
public void setupMockFileSystems() {
63+
FileSystemProvider fileSystemProvider = new MockFileSystemProvider(PathUtils.getDefaultFileSystem(), totalBytesRead::addAndGet);
64+
fileSystem = fileSystemProvider.getFileSystem(null);
65+
PathUtilsForTesting.installMock(fileSystem); // restored by restoreFileSystem in ESTestCase
66+
}
67+
68+
@After
69+
public void closeMockFileSystems() throws IOException {
70+
IOUtils.close(fileSystem);
71+
}
72+
73+
public void testReadBlobRangeCorrectlySkipBytes() throws IOException {
74+
final String blobName = randomAlphaOfLengthBetween(1, 20).toLowerCase(Locale.ROOT);
75+
final byte[] blobData = randomByteArrayOfLength(randomIntBetween(1, frequently() ? 512 : 1 << 20)); // rarely up to 1mb
76+
77+
final Path path = PathUtils.get(createTempDir().toString());
78+
Files.write(path.resolve(blobName), blobData);
79+
80+
final FsBlobContainer container = new FsBlobContainer(new FsBlobStore(Settings.EMPTY, path, false), BlobPath.cleanPath(), path);
81+
assertThat(totalBytesRead.get(), equalTo(0L));
82+
83+
final long start = randomLongBetween(0L, Math.max(0L, blobData.length - 1));
84+
final long length = randomLongBetween(1L, blobData.length - start);
85+
86+
try (InputStream stream = container.readBlob(blobName, start, length)) {
87+
assertThat(totalBytesRead.get(), equalTo(0L));
88+
assertThat(Streams.consumeFully(stream), equalTo(length));
89+
assertThat(totalBytesRead.get(), equalTo(length));
90+
}
91+
}
92+
2993
public void testTempBlobName() {
3094
final String blobName = randomAlphaOfLengthBetween(1, 20);
3195
final String tempBlobName = FsBlobContainer.tempBlobName(blobName);
@@ -37,4 +101,48 @@ public void testIsTempBlobName() {
37101
final String tempBlobName = FsBlobContainer.tempBlobName(randomAlphaOfLengthBetween(1, 20));
38102
assertThat(FsBlobContainer.isTempBlobName(tempBlobName), is(true));
39103
}
104+
105+
static class MockFileSystemProvider extends FilterFileSystemProvider {
106+
107+
final Consumer<Long> onRead;
108+
109+
MockFileSystemProvider(FileSystem inner, Consumer<Long> onRead) {
110+
super("mockfs://", inner);
111+
this.onRead = onRead;
112+
}
113+
114+
private int onRead(int read) {
115+
if (read != -1) {
116+
onRead.accept((long) read);
117+
}
118+
return read;
119+
}
120+
121+
@Override
122+
public SeekableByteChannel newByteChannel(Path path, Set<? extends OpenOption> opts, FileAttribute<?>... attrs) throws IOException {
123+
return new FilterSeekableByteChannel(super.newByteChannel(path, opts, attrs)) {
124+
@Override
125+
public int read(ByteBuffer dst) throws IOException {
126+
return onRead(super.read(dst));
127+
}
128+
};
129+
}
130+
131+
@Override
132+
public InputStream newInputStream(Path path, OpenOption... opts) throws IOException {
133+
// no super.newInputStream(path, opts) as it will use the delegating FileSystem to open a SeekableByteChannel
134+
// and instead we want the mocked newByteChannel() method to be used
135+
return new FilterInputStream(delegate.newInputStream(path, opts)) {
136+
@Override
137+
public int read() throws IOException {
138+
return onRead(super.read());
139+
}
140+
141+
@Override
142+
public int read(byte[] b, int off, int len) throws IOException {
143+
return onRead(super.read(b, off, len));
144+
}
145+
};
146+
}
147+
}
40148
}

0 commit comments

Comments
 (0)