Skip to content

Commit e419d91

Browse files
authored
Add ranged readBlob to S3BlobContainer (#51137)
Implements InputStream readBlob(final String blobName, final long position, final int length) on the S3BlobContainer.
1 parent 4ff6c79 commit e419d91

File tree

6 files changed

+320
-19
lines changed

6 files changed

+320
-19
lines changed

plugins/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3BlobContainer.java

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@
4747
import org.elasticsearch.common.blobstore.support.PlainBlobMetaData;
4848
import org.elasticsearch.common.collect.Tuple;
4949

50+
import java.io.ByteArrayInputStream;
5051
import java.io.IOException;
5152
import java.io.InputStream;
5253
import java.util.ArrayList;
@@ -86,6 +87,21 @@ public InputStream readBlob(String blobName) throws IOException {
8687
return new S3RetryingInputStream(blobStore, buildKey(blobName));
8788
}
8889

90+
@Override
91+
public InputStream readBlob(String blobName, long position, int length) throws IOException {
92+
if (position < 0L) {
93+
throw new IllegalArgumentException("position must be non-negative");
94+
}
95+
if (length < 0) {
96+
throw new IllegalArgumentException("length must be non-negative");
97+
}
98+
if (length == 0) {
99+
return new ByteArrayInputStream(new byte[0]);
100+
} else {
101+
return new S3RetryingInputStream(blobStore, buildKey(blobName), position, Math.addExact(position, length - 1));
102+
}
103+
}
104+
89105
/**
90106
* This implementation ignores the failIfAlreadyExists flag as the S3 API has no way to enforce this due to its weak consistency model.
91107
*/

plugins/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3RetryingInputStream.java

Lines changed: 33 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import org.apache.logging.log4j.LogManager;
2626
import org.apache.logging.log4j.Logger;
2727
import org.apache.logging.log4j.message.ParameterizedMessage;
28+
import org.elasticsearch.common.io.Streams;
2829
import org.elasticsearch.core.internal.io.IOUtils;
2930
import org.elasticsearch.Version;
3031

@@ -49,6 +50,8 @@ class S3RetryingInputStream extends InputStream {
4950

5051
private final S3BlobStore blobStore;
5152
private final String blobKey;
53+
private final long start;
54+
private final long end;
5255
private final int maxAttempts;
5356

5457
private InputStream currentStream;
@@ -58,17 +61,32 @@ class S3RetryingInputStream extends InputStream {
5861
private boolean closed;
5962

6063
S3RetryingInputStream(S3BlobStore blobStore, String blobKey) throws IOException {
64+
this(blobStore, blobKey, 0, Long.MAX_VALUE - 1);
65+
}
66+
67+
// both start and end are inclusive bounds, following the definition in GetObjectRequest.setRange
68+
S3RetryingInputStream(S3BlobStore blobStore, String blobKey, long start, long end) throws IOException {
69+
if (start < 0L) {
70+
throw new IllegalArgumentException("start must be non-negative");
71+
}
72+
if (end < start || end == Long.MAX_VALUE) {
73+
throw new IllegalArgumentException("end must be >= start and not Long.MAX_VALUE");
74+
}
6175
this.blobStore = blobStore;
6276
this.blobKey = blobKey;
6377
this.maxAttempts = blobStore.getMaxRetries() + 1;
78+
this.start = start;
79+
this.end = end;
6480
currentStream = openStream();
6581
}
6682

6783
private InputStream openStream() throws IOException {
6884
try (AmazonS3Reference clientReference = blobStore.clientReference()) {
6985
final GetObjectRequest getObjectRequest = new GetObjectRequest(blobStore.bucket(), blobKey);
70-
if (currentOffset > 0) {
71-
getObjectRequest.setRange(currentOffset);
86+
if (currentOffset > 0 || start > 0 || end < Long.MAX_VALUE - 1) {
87+
assert start + currentOffset <= end :
88+
"requesting beyond end, start = " + start + " offset=" + currentOffset + " end=" + end;
89+
getObjectRequest.setRange(Math.addExact(start, currentOffset), end);
7290
}
7391
final S3Object s3Object = SocketAccess.doPrivileged(() -> clientReference.client().getObject(getObjectRequest));
7492
return s3Object.getObjectContent();
@@ -122,20 +140,32 @@ private void ensureOpen() {
122140

123141
private void reopenStreamOrFail(IOException e) throws IOException {
124142
if (attempt >= maxAttempts) {
143+
logger.debug(new ParameterizedMessage("failed reading [{}/{}] at offset [{}], attempt [{}] of [{}], giving up",
144+
blobStore.bucket(), blobKey, start + currentOffset, attempt, maxAttempts), e);
125145
throw addSuppressedExceptions(e);
126146
}
127147
logger.debug(new ParameterizedMessage("failed reading [{}/{}] at offset [{}], attempt [{}] of [{}], retrying",
128-
blobStore.bucket(), blobKey, currentOffset, attempt, maxAttempts), e);
148+
blobStore.bucket(), blobKey, start + currentOffset, attempt, maxAttempts), e);
129149
attempt += 1;
130150
if (failures.size() < MAX_SUPPRESSED_EXCEPTIONS) {
131151
failures.add(e);
132152
}
153+
try {
154+
Streams.consumeFully(currentStream);
155+
} catch (Exception e2) {
156+
logger.trace("Failed to fully consume stream on close", e);
157+
}
133158
IOUtils.closeWhileHandlingException(currentStream);
134159
currentStream = openStream();
135160
}
136161

137162
@Override
138163
public void close() throws IOException {
164+
try {
165+
Streams.consumeFully(currentStream);
166+
} catch (Exception e) {
167+
logger.trace("Failed to fully consume stream on close", e);
168+
}
139169
currentStream.close();
140170
closed = true;
141171
}

plugins/repository-s3/src/test/java/org/elasticsearch/repositories/s3/S3BlobContainerRetriesTests.java

Lines changed: 151 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@
3232
import org.elasticsearch.common.blobstore.BlobContainer;
3333
import org.elasticsearch.common.blobstore.BlobPath;
3434
import org.elasticsearch.common.bytes.BytesReference;
35+
import org.elasticsearch.common.collect.Tuple;
3536
import org.elasticsearch.common.io.Streams;
3637
import org.elasticsearch.common.lucene.store.ByteArrayIndexInput;
3738
import org.elasticsearch.common.lucene.store.InputStreamIndexInput;
@@ -58,6 +59,7 @@
5859
import java.util.Arrays;
5960
import java.util.Locale;
6061
import java.util.Objects;
62+
import java.util.Optional;
6163
import java.util.concurrent.atomic.AtomicBoolean;
6264
import java.util.concurrent.atomic.AtomicInteger;
6365
import java.util.concurrent.atomic.AtomicLong;
@@ -70,17 +72,22 @@
7072
import static org.elasticsearch.repositories.s3.S3ClientSettings.READ_TIMEOUT_SETTING;
7173
import static org.hamcrest.Matchers.anyOf;
7274
import static org.hamcrest.Matchers.containsString;
75+
import static org.hamcrest.Matchers.either;
7376
import static org.hamcrest.Matchers.equalTo;
77+
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
7478
import static org.hamcrest.Matchers.instanceOf;
7579
import static org.hamcrest.Matchers.is;
7680
import static org.hamcrest.Matchers.lessThan;
81+
import static org.hamcrest.Matchers.lessThanOrEqualTo;
7782

7883
/**
7984
* This class tests how a {@link S3BlobContainer} and its underlying AWS S3 client are retrying requests when reading or writing blobs.
8085
*/
8186
@SuppressForbidden(reason = "use a http server")
8287
public class S3BlobContainerRetriesTests extends ESTestCase {
8388

89+
private static final long MAX_RANGE_VAL = Long.MAX_VALUE - 1;
90+
8491
private HttpServer httpServer;
8592
private S3Service service;
8693

@@ -139,8 +146,19 @@ private BlobContainer createBlobContainer(final @Nullable Integer maxRetries,
139146

140147
public void testReadNonexistentBlobThrowsNoSuchFileException() {
141148
final BlobContainer blobContainer = createBlobContainer(between(1, 5), null, null, null);
142-
final Exception exception = expectThrows(NoSuchFileException.class, () -> blobContainer.readBlob("read_nonexistent_blob"));
149+
final long position = randomLongBetween(0, MAX_RANGE_VAL);
150+
final int length = randomIntBetween(0, Math.toIntExact(Math.min(Integer.MAX_VALUE, MAX_RANGE_VAL - position)));
151+
final Exception exception = expectThrows(NoSuchFileException.class,
152+
() -> {
153+
if (randomBoolean()) {
154+
blobContainer.readBlob("read_nonexistent_blob");
155+
} else {
156+
blobContainer.readBlob("read_nonexistent_blob", 0, 1);
157+
}
158+
});
143159
assertThat(exception.getMessage().toLowerCase(Locale.ROOT), containsString("blob object [read_nonexistent_blob] not found"));
160+
assertThat(expectThrows(NoSuchFileException.class, () -> blobContainer.readBlob("read_nonexistent_blob", position, length))
161+
.getMessage().toLowerCase(Locale.ROOT), containsString("blob object [read_nonexistent_blob] not found"));
144162
}
145163

146164
public void testReadBlobWithRetries() throws Exception {
@@ -153,6 +171,7 @@ public void testReadBlobWithRetries() throws Exception {
153171
if (countDown.countDown()) {
154172
final int rangeStart = getRangeStart(exchange);
155173
assertThat(rangeStart, lessThan(bytes.length));
174+
assertEquals(Optional.empty(), getRangeEnd(exchange));
156175
exchange.getResponseHeaders().add("Content-Type", "text/plain; charset=utf-8");
157176
exchange.sendResponseHeaders(HttpStatus.SC_OK, bytes.length - rangeStart);
158177
exchange.getResponseBody().write(bytes, rangeStart, bytes.length - rangeStart);
@@ -173,8 +192,85 @@ public void testReadBlobWithRetries() throws Exception {
173192
final TimeValue readTimeout = TimeValue.timeValueMillis(between(100, 500));
174193
final BlobContainer blobContainer = createBlobContainer(maxRetries, readTimeout, null, null);
175194
try (InputStream inputStream = blobContainer.readBlob("read_blob_max_retries")) {
176-
assertArrayEquals(bytes, BytesReference.toBytes(Streams.readFully(inputStream)));
177-
assertThat(countDown.isCountedDown(), is(true));
195+
final int readLimit;
196+
final InputStream wrappedStream;
197+
if (randomBoolean()) {
198+
// read stream only partly
199+
readLimit = randomIntBetween(0, bytes.length);
200+
wrappedStream = Streams.limitStream(inputStream, readLimit);
201+
} else {
202+
readLimit = bytes.length;
203+
wrappedStream = inputStream;
204+
}
205+
final byte[] bytesRead = BytesReference.toBytes(Streams.readFully(wrappedStream));
206+
logger.info("maxRetries={}, readLimit={}, byteSize={}, bytesRead={}",
207+
maxRetries, readLimit, bytes.length, bytesRead.length);
208+
assertArrayEquals(Arrays.copyOfRange(bytes, 0, readLimit), bytesRead);
209+
if (readLimit < bytes.length) {
210+
// we might have completed things based on an incomplete response, and we're happy with that
211+
} else {
212+
assertTrue(countDown.isCountedDown());
213+
}
214+
}
215+
}
216+
217+
public void testReadRangeBlobWithRetries() throws Exception {
218+
final int maxRetries = randomInt(5);
219+
final CountDown countDown = new CountDown(maxRetries + 1);
220+
221+
final byte[] bytes = randomBlobContent();
222+
httpServer.createContext("/bucket/read_range_blob_max_retries", exchange -> {
223+
Streams.readFully(exchange.getRequestBody());
224+
if (countDown.countDown()) {
225+
final int rangeStart = getRangeStart(exchange);
226+
assertThat(rangeStart, lessThan(bytes.length));
227+
assertTrue(getRangeEnd(exchange).isPresent());
228+
final int rangeEnd = getRangeEnd(exchange).get();
229+
assertThat(rangeEnd, greaterThanOrEqualTo(rangeStart));
230+
// adapt range end to be compliant to https://www.w3.org/Protocols/rfc2616/rfc2616-sec14.html#sec14.35
231+
final int effectiveRangeEnd = Math.min(bytes.length - 1, rangeEnd);
232+
final int length = (effectiveRangeEnd - rangeStart) + 1;
233+
exchange.getResponseHeaders().add("Content-Type", "text/plain; charset=utf-8");
234+
exchange.sendResponseHeaders(HttpStatus.SC_OK, length);
235+
exchange.getResponseBody().write(bytes, rangeStart, length);
236+
exchange.close();
237+
return;
238+
}
239+
if (randomBoolean()) {
240+
exchange.sendResponseHeaders(randomFrom(HttpStatus.SC_INTERNAL_SERVER_ERROR, HttpStatus.SC_BAD_GATEWAY,
241+
HttpStatus.SC_SERVICE_UNAVAILABLE, HttpStatus.SC_GATEWAY_TIMEOUT), -1);
242+
} else if (randomBoolean()) {
243+
sendIncompleteContent(exchange, bytes);
244+
}
245+
if (randomBoolean()) {
246+
exchange.close();
247+
}
248+
});
249+
250+
final TimeValue readTimeout = TimeValue.timeValueMillis(between(100, 500));
251+
final BlobContainer blobContainer = createBlobContainer(maxRetries, readTimeout, null, null);
252+
final int position = randomIntBetween(0, bytes.length - 1);
253+
final int length = randomIntBetween(0, randomBoolean() ? bytes.length : Integer.MAX_VALUE);
254+
try (InputStream inputStream = blobContainer.readBlob("read_range_blob_max_retries", position, length)) {
255+
final int readLimit;
256+
final InputStream wrappedStream;
257+
if (randomBoolean()) {
258+
// read stream only partly
259+
readLimit = randomIntBetween(0, length);
260+
wrappedStream = Streams.limitStream(inputStream, readLimit);
261+
} else {
262+
readLimit = length;
263+
wrappedStream = inputStream;
264+
}
265+
final byte[] bytesRead = BytesReference.toBytes(Streams.readFully(wrappedStream));
266+
logger.info("maxRetries={}, position={}, length={}, readLimit={}, byteSize={}, bytesRead={}",
267+
maxRetries, position, length, readLimit, bytes.length, bytesRead.length);
268+
assertArrayEquals(Arrays.copyOfRange(bytes, position, Math.min(bytes.length, position + readLimit)), bytesRead);
269+
if (readLimit == 0 || (readLimit < length && readLimit == bytesRead.length)) {
270+
// we might have completed things based on an incomplete response, and we're happy with that
271+
} else {
272+
assertTrue(countDown.isCountedDown());
273+
}
178274
}
179275
}
180276

@@ -194,12 +290,18 @@ public void testReadBlobWithReadTimeouts() {
194290
final byte[] bytes = randomBlobContent();
195291
httpServer.createContext("/bucket/read_blob_incomplete", exchange -> sendIncompleteContent(exchange, bytes));
196292

197-
exception = expectThrows(SocketTimeoutException.class, () -> {
198-
try (InputStream stream = blobContainer.readBlob("read_blob_incomplete")) {
293+
final int position = randomIntBetween(0, bytes.length - 1);
294+
final int length = randomIntBetween(1, randomBoolean() ? bytes.length : Integer.MAX_VALUE);
295+
exception = expectThrows(IOException.class, () -> {
296+
try (InputStream stream = randomBoolean() ?
297+
blobContainer.readBlob("read_blob_incomplete") :
298+
blobContainer.readBlob("read_blob_incomplete", position, length)) {
199299
Streams.readFully(stream);
200300
}
201301
});
202-
assertThat(exception.getMessage().toLowerCase(Locale.ROOT), containsString("read timed out"));
302+
assertThat(exception, either(instanceOf(SocketTimeoutException.class)).or(instanceOf(ConnectionClosedException.class)));
303+
assertThat(exception.getMessage().toLowerCase(Locale.ROOT), either(containsString("read timed out")).or(
304+
containsString("premature end of chunk coded message body: closing chunk expected")));
203305
assertThat(exception.getSuppressed().length, equalTo(maxRetries));
204306
}
205307

@@ -209,7 +311,14 @@ public void testReadBlobWithNoHttpResponse() {
209311
// HTTP server closes connection immediately
210312
httpServer.createContext("/bucket/read_blob_no_response", HttpExchange::close);
211313

212-
Exception exception = expectThrows(SdkClientException.class, () -> blobContainer.readBlob("read_blob_no_response"));
314+
Exception exception = expectThrows(SdkClientException.class,
315+
() -> {
316+
if (randomBoolean()) {
317+
blobContainer.readBlob("read_blob_no_response");
318+
} else {
319+
blobContainer.readBlob("read_blob_no_response", 0, 1);
320+
}
321+
});
213322
assertThat(exception.getMessage().toLowerCase(Locale.ROOT), containsString("the target server failed to respond"));
214323
assertThat(exception.getCause(), instanceOf(NoHttpResponseException.class));
215324
assertThat(exception.getSuppressed().length, equalTo(0));
@@ -227,12 +336,15 @@ public void testReadBlobWithPrematureConnectionClose() {
227336
});
228337

229338
final Exception exception = expectThrows(ConnectionClosedException.class, () -> {
230-
try (InputStream stream = blobContainer.readBlob("read_blob_incomplete")) {
339+
try (InputStream stream = randomBoolean() ?
340+
blobContainer.readBlob("read_blob_incomplete", 0, 1):
341+
blobContainer.readBlob("read_blob_incomplete")) {
231342
Streams.readFully(stream);
232343
}
233344
});
234345
assertThat(exception.getMessage().toLowerCase(Locale.ROOT),
235-
containsString("premature end of content-length delimited message body"));
346+
either(containsString("premature end of chunk coded message body: closing chunk expected"))
347+
.or(containsString("premature end of content-length delimited message body")));
236348
assertThat(exception.getSuppressed().length, equalTo(Math.min(S3RetryingInputStream.MAX_SUPPRESSED_EXCEPTIONS, maxRetries)));
237349
}
238350

@@ -397,23 +509,47 @@ private static byte[] randomBlobContent() {
397509
return randomByteArrayOfLength(randomIntBetween(1, frequently() ? 512 : 1 << 20)); // rarely up to 1mb
398510
}
399511

400-
private static int getRangeStart(HttpExchange exchange) {
512+
private static Tuple<Long, Long> getRange(HttpExchange exchange) {
401513
final String rangeHeader = exchange.getRequestHeaders().getFirst("Range");
402514
if (rangeHeader == null) {
403-
return 0;
515+
return Tuple.tuple(0L, MAX_RANGE_VAL);
404516
}
405517

406-
final Matcher matcher = Pattern.compile("^bytes=([0-9]+)-9223372036854775806$").matcher(rangeHeader);
518+
final Matcher matcher = Pattern.compile("^bytes=([0-9]+)-([0-9]+)$").matcher(rangeHeader);
407519
assertTrue(rangeHeader + " matches expected pattern", matcher.matches());
408-
return Math.toIntExact(Long.parseLong(matcher.group(1)));
520+
long rangeStart = Long.parseLong(matcher.group(1));
521+
long rangeEnd = Long.parseLong(matcher.group(2));
522+
assertThat(rangeStart, lessThanOrEqualTo(rangeEnd));
523+
return Tuple.tuple(rangeStart, rangeEnd);
524+
}
525+
526+
private static int getRangeStart(HttpExchange exchange) {
527+
return Math.toIntExact(getRange(exchange).v1());
528+
}
529+
530+
private static Optional<Integer> getRangeEnd(HttpExchange exchange) {
531+
final long rangeEnd = getRange(exchange).v2();
532+
if (rangeEnd == MAX_RANGE_VAL) {
533+
return Optional.empty();
534+
}
535+
return Optional.of(Math.toIntExact(rangeEnd));
409536
}
410537

411538
private static void sendIncompleteContent(HttpExchange exchange, byte[] bytes) throws IOException {
412539
final int rangeStart = getRangeStart(exchange);
413540
assertThat(rangeStart, lessThan(bytes.length));
541+
final Optional<Integer> rangeEnd = getRangeEnd(exchange);
542+
final int length;
543+
if (rangeEnd.isPresent()) {
544+
// adapt range end to be compliant to https://www.w3.org/Protocols/rfc2616/rfc2616-sec14.html#sec14.35
545+
final int effectiveRangeEnd = Math.min(rangeEnd.get(), bytes.length - 1);
546+
length = effectiveRangeEnd - rangeStart;
547+
} else {
548+
length = bytes.length - rangeStart - 1;
549+
}
414550
exchange.getResponseHeaders().add("Content-Type", "text/plain; charset=utf-8");
415-
exchange.sendResponseHeaders(HttpStatus.SC_OK, bytes.length - rangeStart);
416-
final int bytesToSend = randomIntBetween(0, bytes.length - rangeStart - 1);
551+
exchange.sendResponseHeaders(HttpStatus.SC_OK, length);
552+
final int bytesToSend = randomIntBetween(0, length - 1);
417553
if (bytesToSend > 0) {
418554
exchange.getResponseBody().write(bytes, rangeStart, bytesToSend);
419555
}

server/src/main/java/org/elasticsearch/common/blobstore/fs/FsBlobContainer.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -157,7 +157,7 @@ public InputStream readBlob(String blobName, long position, int length) throws I
157157
final InputStream inputStream = readBlob(blobName);
158158
long skipped = inputStream.skip(position); // NORELEASE
159159
assert skipped == position;
160-
return inputStream;
160+
return org.elasticsearch.common.io.Streams.limitStream(inputStream, length);
161161
}
162162

163163
@Override

0 commit comments

Comments
 (0)