Skip to content

Commit 92fb003

Browse files
authored
Abort non-fully consumed S3 input stream (#62167)
Today when an S3RetryingInputStream is closed the remaining bytes that were not consumed are drained right before closing the underlying stream. In some contexts it might be more efficient to not consume the remaining bytes and just drop the connection. This is for example the case with snapshot backed indices prewarming, where there is not point in reading potentially large blobs if we know the cache file we want to write the content of the blob as already been evicted. Draining all bytes here takes a slot in the prewarming thread pool for nothing.
1 parent b0eeeac commit 92fb003

File tree

3 files changed

+250
-17
lines changed

3 files changed

+250
-17
lines changed

plugins/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3RetryingInputStream.java

Lines changed: 70 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -21,12 +21,13 @@
2121
import com.amazonaws.AmazonClientException;
2222
import com.amazonaws.services.s3.model.AmazonS3Exception;
2323
import com.amazonaws.services.s3.model.GetObjectRequest;
24+
import com.amazonaws.services.s3.model.ObjectMetadata;
2425
import com.amazonaws.services.s3.model.S3Object;
26+
import com.amazonaws.services.s3.model.S3ObjectInputStream;
2527
import org.apache.logging.log4j.LogManager;
2628
import org.apache.logging.log4j.Logger;
2729
import org.apache.logging.log4j.message.ParameterizedMessage;
2830
import org.elasticsearch.Version;
29-
import org.elasticsearch.common.io.Streams;
3031
import org.elasticsearch.core.internal.io.IOUtils;
3132

3233
import java.io.IOException;
@@ -53,12 +54,14 @@ class S3RetryingInputStream extends InputStream {
5354
private final long start;
5455
private final long end;
5556
private final int maxAttempts;
57+
private final List<IOException> failures;
5658

57-
private InputStream currentStream;
59+
private S3ObjectInputStream currentStream;
60+
private long currentStreamLastOffset;
5861
private int attempt = 1;
59-
private List<IOException> failures = new ArrayList<>(MAX_SUPPRESSED_EXCEPTIONS);
6062
private long currentOffset;
6163
private boolean closed;
64+
private boolean eof;
6265

6366
S3RetryingInputStream(S3BlobStore blobStore, String blobKey) throws IOException {
6467
this(blobStore, blobKey, 0, Long.MAX_VALUE - 1);
@@ -75,12 +78,13 @@ class S3RetryingInputStream extends InputStream {
7578
this.blobStore = blobStore;
7679
this.blobKey = blobKey;
7780
this.maxAttempts = blobStore.getMaxRetries() + 1;
81+
this.failures = new ArrayList<>(MAX_SUPPRESSED_EXCEPTIONS);
7882
this.start = start;
7983
this.end = end;
80-
currentStream = openStream();
84+
openStream();
8185
}
8286

83-
private InputStream openStream() throws IOException {
87+
private void openStream() throws IOException {
8488
try (AmazonS3Reference clientReference = blobStore.clientReference()) {
8589
final GetObjectRequest getObjectRequest = new GetObjectRequest(blobStore.bucket(), blobKey);
8690
getObjectRequest.setRequestMetricCollector(blobStore.getMetricCollector);
@@ -90,7 +94,8 @@ private InputStream openStream() throws IOException {
9094
getObjectRequest.setRange(Math.addExact(start, currentOffset), end);
9195
}
9296
final S3Object s3Object = SocketAccess.doPrivileged(() -> clientReference.client().getObject(getObjectRequest));
93-
return s3Object.getObjectContent();
97+
this.currentStreamLastOffset = Math.addExact(Math.addExact(start, currentOffset), getStreamLength(s3Object));
98+
this.currentStream = s3Object.getObjectContent();
9499
} catch (final AmazonClientException e) {
95100
if (e instanceof AmazonS3Exception) {
96101
if (404 == ((AmazonS3Exception) e).getStatusCode()) {
@@ -101,12 +106,35 @@ private InputStream openStream() throws IOException {
101106
}
102107
}
103108

109+
private long getStreamLength(final S3Object object) {
110+
final ObjectMetadata metadata = object.getObjectMetadata();
111+
try {
112+
// Returns the content range of the object if response contains the Content-Range header.
113+
final Long[] range = metadata.getContentRange();
114+
if (range != null) {
115+
assert range[1] >= range[0] : range[1] + " vs " + range[0];
116+
assert range[0] == start + currentOffset :
117+
"Content-Range start value [" + range[0] + "] exceeds start [" + start + "] + current offset [" + currentOffset + ']';
118+
assert range[1] == end : "Content-Range end value [" + range[1] + "] exceeds end [" + end + ']';
119+
return range[1] - range[0] + 1L;
120+
}
121+
return metadata.getContentLength();
122+
} catch (Exception e) {
123+
assert false : e;
124+
return Long.MAX_VALUE - 1L; // assume a large stream so that the underlying stream is aborted on closing, unless eof is reached
125+
}
126+
}
127+
104128
@Override
105129
public int read() throws IOException {
106130
ensureOpen();
107131
while (true) {
108132
try {
109133
final int result = currentStream.read();
134+
if (result == -1) {
135+
eof = true;
136+
return -1;
137+
}
110138
currentOffset += 1;
111139
return result;
112140
} catch (IOException e) {
@@ -122,6 +150,7 @@ public int read(byte[] b, int off, int len) throws IOException {
122150
try {
123151
final int bytesRead = currentStream.read(b, off, len);
124152
if (bytesRead == -1) {
153+
eof = true;
125154
return -1;
126155
}
127156
currentOffset += bytesRead;
@@ -151,24 +180,36 @@ private void reopenStreamOrFail(IOException e) throws IOException {
151180
if (failures.size() < MAX_SUPPRESSED_EXCEPTIONS) {
152181
failures.add(e);
153182
}
154-
try {
155-
Streams.consumeFully(currentStream);
156-
} catch (Exception e2) {
157-
logger.trace("Failed to fully consume stream on close", e);
158-
}
183+
maybeAbort(currentStream);
159184
IOUtils.closeWhileHandlingException(currentStream);
160-
currentStream = openStream();
185+
openStream();
161186
}
162187

163188
@Override
164189
public void close() throws IOException {
190+
maybeAbort(currentStream);
165191
try {
166-
Streams.consumeFully(currentStream);
192+
currentStream.close();
193+
} finally {
194+
closed = true;
195+
}
196+
}
197+
198+
/**
199+
* Abort the {@link S3ObjectInputStream} if it wasn't read completely at the time this method is called,
200+
* suppressing all thrown exceptions.
201+
*/
202+
private void maybeAbort(S3ObjectInputStream stream) {
203+
if (eof) {
204+
return;
205+
}
206+
try {
207+
if (start + currentOffset < currentStreamLastOffset) {
208+
stream.abort();
209+
}
167210
} catch (Exception e) {
168-
logger.trace("Failed to fully consume stream on close", e);
211+
logger.warn("Failed to abort stream before closing", e);
169212
}
170-
currentStream.close();
171-
closed = true;
172213
}
173214

174215
@Override
@@ -187,4 +228,17 @@ private <T extends Exception> T addSuppressedExceptions(T e) {
187228
}
188229
return e;
189230
}
231+
232+
// package-private for tests
233+
boolean isEof() {
234+
return eof;
235+
}
236+
237+
// package-private for tests
238+
boolean isAborted() {
239+
if (currentStream == null || currentStream.getHttpRequest() == null) {
240+
return false;
241+
}
242+
return currentStream.getHttpRequest().isAborted();
243+
}
190244
}

plugins/repository-s3/src/test/java/org/elasticsearch/repositories/s3/S3BlobContainerRetriesTests.java

Lines changed: 64 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,8 @@
4343
import org.junit.After;
4444
import org.junit.Before;
4545

46+
import java.io.ByteArrayInputStream;
47+
import java.io.FilterInputStream;
4648
import java.io.IOException;
4749
import java.io.InputStream;
4850
import java.net.InetSocketAddress;
@@ -133,7 +135,17 @@ protected BlobContainer createBlobContainer(final @Nullable Integer maxRetries,
133135
bufferSize == null ? S3Repository.BUFFER_SIZE_SETTING.getDefault(Settings.EMPTY) : bufferSize,
134136
S3Repository.CANNED_ACL_SETTING.getDefault(Settings.EMPTY),
135137
S3Repository.STORAGE_CLASS_SETTING.getDefault(Settings.EMPTY),
136-
repositoryMetadata));
138+
repositoryMetadata)) {
139+
@Override
140+
public InputStream readBlob(String blobName) throws IOException {
141+
return new AssertingInputStream(super.readBlob(blobName), blobName);
142+
}
143+
144+
@Override
145+
public InputStream readBlob(String blobName, long position, long length) throws IOException {
146+
return new AssertingInputStream(super.readBlob(blobName, position, length), blobName, position, length);
147+
}
148+
};
137149
}
138150

139151
public void testWriteBlobWithRetries() throws Exception {
@@ -292,4 +304,55 @@ public void testWriteLargeBlob() throws Exception {
292304
assertThat(countDownUploads.get(), equalTo(0));
293305
assertThat(countDownComplete.isCountedDown(), is(true));
294306
}
307+
308+
/**
309+
* Asserts that an InputStream is fully consumed, or aborted, when it is closed
310+
*/
311+
private static class AssertingInputStream extends FilterInputStream {
312+
313+
private final String blobName;
314+
private final boolean range;
315+
private final long position;
316+
private final long length;
317+
318+
AssertingInputStream(InputStream in, String blobName) {
319+
super(in);
320+
this.blobName = blobName;
321+
this.position = 0L;
322+
this.length = Long.MAX_VALUE;
323+
this.range = false;
324+
}
325+
326+
AssertingInputStream(InputStream in, String blobName, long position, long length) {
327+
super(in);
328+
this.blobName = blobName;
329+
this.position = position;
330+
this.length = length;
331+
this.range = true;
332+
}
333+
334+
@Override
335+
public String toString() {
336+
String description = "[blobName='" + blobName + "', range=" + range;
337+
if (range) {
338+
description += ", position=" + position;
339+
description += ", length=" + length;
340+
}
341+
description += ']';
342+
return description;
343+
}
344+
345+
@Override
346+
public void close() throws IOException {
347+
super.close();
348+
if (in instanceof S3RetryingInputStream) {
349+
final S3RetryingInputStream s3Stream = (S3RetryingInputStream) in;
350+
assertTrue("Stream " + toString() + " should have reached EOF or should have been aborted but got [eof=" + s3Stream.isEof()
351+
+ ", aborted=" + s3Stream.isAborted() + ']', s3Stream.isEof() || s3Stream.isAborted());
352+
} else {
353+
assertThat(in, instanceOf(ByteArrayInputStream.class));
354+
assertThat(((ByteArrayInputStream) in).available(), equalTo(0));
355+
}
356+
}
357+
}
295358
}
Lines changed: 116 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,116 @@
1+
/*
2+
* Licensed to Elasticsearch under one or more contributor
3+
* license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright
5+
* ownership. Elasticsearch licenses this file to you under
6+
* the Apache License, Version 2.0 (the "License"); you may
7+
* not use this file except in compliance with the License.
8+
* You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.elasticsearch.repositories.s3;
21+
22+
import com.amazonaws.services.s3.AmazonS3;
23+
import com.amazonaws.services.s3.model.GetObjectRequest;
24+
import com.amazonaws.services.s3.model.S3Object;
25+
import com.amazonaws.services.s3.model.S3ObjectInputStream;
26+
import org.apache.http.client.methods.HttpGet;
27+
import org.elasticsearch.common.Nullable;
28+
import org.elasticsearch.common.io.Streams;
29+
import org.elasticsearch.test.ESTestCase;
30+
31+
import java.io.ByteArrayInputStream;
32+
import java.io.IOException;
33+
import java.util.Arrays;
34+
35+
import static org.hamcrest.Matchers.is;
36+
import static org.mockito.Matchers.any;
37+
import static org.mockito.Mockito.mock;
38+
import static org.mockito.Mockito.when;
39+
40+
public class S3RetryingInputStreamTests extends ESTestCase {
41+
42+
public void testInputStreamFullyConsumed() throws IOException {
43+
final byte[] expectedBytes = randomByteArrayOfLength(randomIntBetween(1, 512));
44+
45+
final S3RetryingInputStream stream = createInputStream(expectedBytes, null, null);
46+
Streams.consumeFully(stream);
47+
48+
assertThat(stream.isEof(), is(true));
49+
assertThat(stream.isAborted(), is(false));
50+
}
51+
52+
public void testInputStreamIsAborted() throws IOException {
53+
final byte[] expectedBytes = randomByteArrayOfLength(randomIntBetween(10, 512));
54+
final byte[] actualBytes = new byte[randomIntBetween(1, Math.max(1, expectedBytes.length - 1))];
55+
56+
final S3RetryingInputStream stream = createInputStream(expectedBytes, null, null);
57+
stream.read(actualBytes);
58+
stream.close();
59+
60+
assertArrayEquals(Arrays.copyOf(expectedBytes, actualBytes.length), actualBytes);
61+
assertThat(stream.isEof(), is(false));
62+
assertThat(stream.isAborted(), is(true));
63+
}
64+
65+
public void testRangeInputStreamFullyConsumed() throws IOException {
66+
final byte[] bytes = randomByteArrayOfLength(randomIntBetween(1, 512));
67+
final int position = randomIntBetween(0, bytes.length - 1);
68+
final int length = randomIntBetween(1, bytes.length - position);
69+
70+
final S3RetryingInputStream stream = createInputStream(bytes, position, length);
71+
Streams.consumeFully(stream);
72+
73+
assertThat(stream.isEof(), is(true));
74+
assertThat(stream.isAborted(), is(false));
75+
}
76+
77+
public void testRangeInputStreamIsAborted() throws IOException {
78+
final byte[] expectedBytes = randomByteArrayOfLength(randomIntBetween(10, 512));
79+
final byte[] actualBytes = new byte[randomIntBetween(1, Math.max(1, expectedBytes.length - 1))];
80+
81+
final int length = randomIntBetween(actualBytes.length + 1, expectedBytes.length);
82+
final int position = randomIntBetween(0, Math.max(1, expectedBytes.length - length));
83+
84+
final S3RetryingInputStream stream = createInputStream(expectedBytes, position, length);
85+
stream.read(actualBytes);
86+
stream.close();
87+
88+
assertArrayEquals(Arrays.copyOfRange(expectedBytes, position, position + actualBytes.length), actualBytes);
89+
assertThat(stream.isEof(), is(false));
90+
assertThat(stream.isAborted(), is(true));
91+
}
92+
93+
private S3RetryingInputStream createInputStream(
94+
final byte[] data,
95+
@Nullable final Integer position,
96+
@Nullable final Integer length
97+
) throws IOException {
98+
final S3Object s3Object = new S3Object();
99+
final AmazonS3 client = mock(AmazonS3.class);
100+
when(client.getObject(any(GetObjectRequest.class))).thenReturn(s3Object);
101+
final AmazonS3Reference clientReference = mock(AmazonS3Reference.class);
102+
when(clientReference.client()).thenReturn(client);
103+
final S3BlobStore blobStore = mock(S3BlobStore.class);
104+
when(blobStore.clientReference()).thenReturn(clientReference);
105+
106+
if (position != null && length != null) {
107+
s3Object.getObjectMetadata().setContentLength(length);
108+
s3Object.setObjectContent(new S3ObjectInputStream(new ByteArrayInputStream(data, position, length), new HttpGet()));
109+
return new S3RetryingInputStream(blobStore, "_blob", position, Math.addExact(position, length - 1));
110+
} else {
111+
s3Object.getObjectMetadata().setContentLength(data.length);
112+
s3Object.setObjectContent(new S3ObjectInputStream(new ByteArrayInputStream(data), new HttpGet()));
113+
return new S3RetryingInputStream(blobStore, "_blob");
114+
}
115+
}
116+
}

0 commit comments

Comments
 (0)