Skip to content

Commit 4740437

Browse files
Add Blob Download Retries to GCS Repository (elastic#52479)
* Add Blob Download Retries to GCS Repository Exactly as elastic#46589 (and kept as close to it as possible code wise so we can dry things up in a follow-up potentially) but for GCS. Closes elastic#52319
1 parent 3afb5ca commit 4740437

File tree

3 files changed

+211
-31
lines changed

3 files changed

+211
-31
lines changed

plugins/repository-gcs/src/main/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageBlobStore.java

+1-31
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,6 @@
2121

2222
import com.google.api.gax.paging.Page;
2323
import com.google.cloud.BatchResult;
24-
import com.google.cloud.ReadChannel;
2524
import com.google.cloud.storage.Blob;
2625
import com.google.cloud.storage.BlobId;
2726
import com.google.cloud.storage.BlobInfo;
@@ -34,7 +33,6 @@
3433
import org.apache.logging.log4j.Logger;
3534
import org.apache.logging.log4j.message.ParameterizedMessage;
3635
import org.elasticsearch.ExceptionsHelper;
37-
import org.elasticsearch.common.SuppressForbidden;
3836
import org.elasticsearch.common.blobstore.BlobContainer;
3937
import org.elasticsearch.common.blobstore.BlobMetaData;
4038
import org.elasticsearch.common.blobstore.BlobPath;
@@ -47,11 +45,8 @@
4745

4846
import java.io.IOException;
4947
import java.io.InputStream;
50-
import java.nio.ByteBuffer;
5148
import java.nio.channels.Channels;
52-
import java.nio.channels.ReadableByteChannel;
5349
import java.nio.file.FileAlreadyExistsException;
54-
import java.nio.file.NoSuchFileException;
5550
import java.util.ArrayList;
5651
import java.util.Collection;
5752
import java.util.Collections;
@@ -176,32 +171,7 @@ Map<String, BlobContainer> listChildren(BlobPath path) throws IOException {
176171
* @return the InputStream used to read the blob's content
177172
*/
178173
InputStream readBlob(String blobName) throws IOException {
179-
final BlobId blobId = BlobId.of(bucketName, blobName);
180-
final ReadChannel readChannel = SocketAccess.doPrivilegedIOException(() -> client().reader(blobId));
181-
return Channels.newInputStream(new ReadableByteChannel() {
182-
@SuppressForbidden(reason = "Channel is based of a socket not a file")
183-
@Override
184-
public int read(ByteBuffer dst) throws IOException {
185-
try {
186-
return SocketAccess.doPrivilegedIOException(() -> readChannel.read(dst));
187-
} catch (StorageException e) {
188-
if (e.getCode() == HTTP_NOT_FOUND) {
189-
throw new NoSuchFileException("Blob [" + blobName + "] does not exist");
190-
}
191-
throw e;
192-
}
193-
}
194-
195-
@Override
196-
public boolean isOpen() {
197-
return readChannel.isOpen();
198-
}
199-
200-
@Override
201-
public void close() throws IOException {
202-
SocketAccess.doPrivilegedVoidIOException(readChannel::close);
203-
}
204-
});
174+
return new GoogleCloudStorageRetryingInputStream(client(), BlobId.of(bucketName, blobName));
205175
}
206176

207177
/**
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,181 @@
1+
/*
2+
* Licensed to Elasticsearch under one or more contributor
3+
* license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright
5+
* ownership. Elasticsearch licenses this file to you under
6+
* the Apache License, Version 2.0 (the "License"); you may
7+
* not use this file except in compliance with the License.
8+
* You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
package org.elasticsearch.repositories.gcs;
20+
21+
import com.google.cloud.ReadChannel;
22+
import com.google.cloud.storage.BlobId;
23+
import com.google.cloud.storage.Storage;
24+
import com.google.cloud.storage.StorageException;
25+
import org.apache.logging.log4j.LogManager;
26+
import org.apache.logging.log4j.Logger;
27+
import org.apache.logging.log4j.message.ParameterizedMessage;
28+
import org.elasticsearch.common.SuppressForbidden;
29+
import org.elasticsearch.core.internal.io.IOUtils;
30+
31+
import java.io.IOException;
32+
import java.io.InputStream;
33+
import java.nio.ByteBuffer;
34+
import java.nio.channels.Channels;
35+
import java.nio.channels.ReadableByteChannel;
36+
import java.nio.file.NoSuchFileException;
37+
import java.util.ArrayList;
38+
import java.util.List;
39+
40+
import static java.net.HttpURLConnection.HTTP_NOT_FOUND;
41+
42+
/**
43+
* Wrapper around reads from GCS that will retry blob downloads that fail part-way through, resuming from where the failure occurred.
44+
* This should be handled by the SDK but it isn't today. This should be revisited in the future (e.g. before removing
45+
* the {@link org.elasticsearch.Version#V_7_0_0} version constant) and removed if the SDK handles retries itself in the future.
46+
*/
47+
class GoogleCloudStorageRetryingInputStream extends InputStream {
48+
49+
private static final Logger logger = LogManager.getLogger(GoogleCloudStorageRetryingInputStream.class);
50+
51+
static final int MAX_SUPPRESSED_EXCEPTIONS = 10;
52+
53+
private final Storage client;
54+
55+
private final BlobId blobId;
56+
57+
private final int maxRetries;
58+
59+
private InputStream currentStream;
60+
private int attempt = 1;
61+
private List<StorageException> failures = new ArrayList<>(MAX_SUPPRESSED_EXCEPTIONS);
62+
private long currentOffset;
63+
private boolean closed;
64+
65+
GoogleCloudStorageRetryingInputStream(Storage client, BlobId blobId) throws IOException {
66+
this.client = client;
67+
this.blobId = blobId;
68+
this.maxRetries = client.getOptions().getRetrySettings().getMaxAttempts() + 1;
69+
currentStream = openStream();
70+
}
71+
72+
private InputStream openStream() throws IOException {
73+
try {
74+
final ReadChannel readChannel = SocketAccess.doPrivilegedIOException(() -> client.reader(blobId));
75+
if (currentOffset > 0L) {
76+
readChannel.seek(currentOffset);
77+
}
78+
return Channels.newInputStream(new ReadableByteChannel() {
79+
@SuppressForbidden(reason = "Channel is based of a socket not a file")
80+
@Override
81+
public int read(ByteBuffer dst) throws IOException {
82+
try {
83+
return SocketAccess.doPrivilegedIOException(() -> readChannel.read(dst));
84+
} catch (StorageException e) {
85+
if (e.getCode() == HTTP_NOT_FOUND) {
86+
throw new NoSuchFileException("Blob [" + blobId.getName() + "] does not exist");
87+
}
88+
throw e;
89+
}
90+
}
91+
92+
@Override
93+
public boolean isOpen() {
94+
return readChannel.isOpen();
95+
}
96+
97+
@Override
98+
public void close() throws IOException {
99+
SocketAccess.doPrivilegedVoidIOException(readChannel::close);
100+
}
101+
});
102+
} catch (StorageException e) {
103+
throw addSuppressedExceptions(e);
104+
}
105+
}
106+
107+
@Override
108+
public int read() throws IOException {
109+
ensureOpen();
110+
while (true) {
111+
try {
112+
final int result = currentStream.read();
113+
currentOffset += 1;
114+
return result;
115+
} catch (StorageException e) {
116+
reopenStreamOrFail(e);
117+
}
118+
}
119+
}
120+
121+
@Override
122+
public int read(byte[] b, int off, int len) throws IOException {
123+
ensureOpen();
124+
while (true) {
125+
try {
126+
final int bytesRead = currentStream.read(b, off, len);
127+
if (bytesRead == -1) {
128+
return -1;
129+
}
130+
currentOffset += bytesRead;
131+
return bytesRead;
132+
} catch (StorageException e) {
133+
reopenStreamOrFail(e);
134+
}
135+
}
136+
}
137+
138+
private void ensureOpen() {
139+
if (closed) {
140+
assert false : "using GoogleCloudStorageRetryingInputStream after close";
141+
throw new IllegalStateException("using GoogleCloudStorageRetryingInputStream after close");
142+
}
143+
}
144+
145+
private void reopenStreamOrFail(StorageException e) throws IOException {
146+
if (attempt >= maxRetries) {
147+
throw addSuppressedExceptions(e);
148+
}
149+
logger.debug(new ParameterizedMessage("failed reading [{}] at offset [{}], attempt [{}] of [{}], retrying",
150+
blobId, currentOffset, attempt, MAX_SUPPRESSED_EXCEPTIONS), e);
151+
attempt += 1;
152+
if (failures.size() < MAX_SUPPRESSED_EXCEPTIONS) {
153+
failures.add(e);
154+
}
155+
IOUtils.closeWhileHandlingException(currentStream);
156+
currentStream = openStream();
157+
}
158+
159+
@Override
160+
public void close() throws IOException {
161+
currentStream.close();
162+
closed = true;
163+
}
164+
165+
@Override
166+
public long skip(long n) {
167+
throw new UnsupportedOperationException("GoogleCloudStorageRetryingInputStream does not support seeking");
168+
}
169+
170+
@Override
171+
public void reset() {
172+
throw new UnsupportedOperationException("GoogleCloudStorageRetryingInputStream does not support seeking");
173+
}
174+
175+
private <T extends Exception> T addSuppressedExceptions(T e) {
176+
for (StorageException failure : failures) {
177+
e.addSuppressed(failure);
178+
}
179+
return e;
180+
}
181+
}

plugins/repository-gcs/src/test/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageBlobContainerRetriesTests.java

+29
Original file line numberDiff line numberDiff line change
@@ -200,6 +200,35 @@ public void testReadBlobWithRetries() throws Exception {
200200
}
201201
}
202202

203+
public void testReadLargeBlobWithRetries() throws Exception {
204+
final int maxRetries = randomIntBetween(2, 10);
205+
final CountDown countDown = new CountDown(maxRetries);
206+
207+
// SDK reads in 2 MB chunks so we use twice that to simulate 2 chunks
208+
final byte[] bytes = randomBytes(1 << 22);
209+
httpServer.createContext("/download/storage/v1/b/bucket/o/large_blob_retries", exchange -> {
210+
Streams.readFully(exchange.getRequestBody());
211+
exchange.getResponseHeaders().add("Content-Type", "application/octet-stream");
212+
final String[] range = exchange.getRequestHeaders().get("Range").get(0).substring("bytes=".length()).split("-");
213+
final int offset = Integer.parseInt(range[0]);
214+
final int end = Integer.parseInt(range[1]);
215+
final byte[] chunk = Arrays.copyOfRange(bytes, offset, Math.min(end + 1, bytes.length));
216+
exchange.sendResponseHeaders(RestStatus.OK.getStatus(), chunk.length);
217+
if (randomBoolean() && countDown.countDown() == false) {
218+
exchange.getResponseBody().write(chunk, 0, chunk.length - 1);
219+
exchange.close();
220+
return;
221+
}
222+
exchange.getResponseBody().write(chunk);
223+
exchange.close();
224+
});
225+
226+
final BlobContainer blobContainer = createBlobContainer(maxRetries, null);
227+
try (InputStream inputStream = blobContainer.readBlob("large_blob_retries")) {
228+
assertArrayEquals(bytes, BytesReference.toBytes(Streams.readFully(inputStream)));
229+
}
230+
}
231+
203232
public void testReadBlobWithReadTimeouts() {
204233
final int maxRetries = randomIntBetween(1, 3);
205234
final BlobContainer blobContainer = createBlobContainer(maxRetries, TimeValue.timeValueMillis(between(100, 200)));

0 commit comments

Comments
 (0)