Skip to content

Commit e6815fa

Browse files
Add Blob Download Retries to GCS Repository
Exactly as elastic#46589 (and kept as close to it as possible code wise so we can dry things up in a follow-up potentially) but for GCS. Closes elastic#52319
1 parent c4cc68e commit e6815fa

File tree

3 files changed

+209
-31
lines changed

3 files changed

+209
-31
lines changed

plugins/repository-gcs/src/main/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageBlobStore.java

+1-31
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,6 @@
2121

2222
import com.google.api.gax.paging.Page;
2323
import com.google.cloud.BatchResult;
24-
import com.google.cloud.ReadChannel;
2524
import com.google.cloud.storage.Blob;
2625
import com.google.cloud.storage.BlobId;
2726
import com.google.cloud.storage.BlobInfo;
@@ -34,7 +33,6 @@
3433
import org.apache.logging.log4j.Logger;
3534
import org.apache.logging.log4j.message.ParameterizedMessage;
3635
import org.elasticsearch.ExceptionsHelper;
37-
import org.elasticsearch.common.SuppressForbidden;
3836
import org.elasticsearch.common.blobstore.BlobContainer;
3937
import org.elasticsearch.common.blobstore.BlobMetaData;
4038
import org.elasticsearch.common.blobstore.BlobPath;
@@ -47,11 +45,8 @@
4745

4846
import java.io.IOException;
4947
import java.io.InputStream;
50-
import java.nio.ByteBuffer;
5148
import java.nio.channels.Channels;
52-
import java.nio.channels.ReadableByteChannel;
5349
import java.nio.file.FileAlreadyExistsException;
54-
import java.nio.file.NoSuchFileException;
5550
import java.util.ArrayList;
5651
import java.util.Collection;
5752
import java.util.Collections;
@@ -176,32 +171,7 @@ Map<String, BlobContainer> listChildren(BlobPath path) throws IOException {
176171
* @return the InputStream used to read the blob's content
177172
*/
178173
InputStream readBlob(String blobName) throws IOException {
179-
final BlobId blobId = BlobId.of(bucketName, blobName);
180-
final ReadChannel readChannel = SocketAccess.doPrivilegedIOException(() -> client().reader(blobId));
181-
return Channels.newInputStream(new ReadableByteChannel() {
182-
@SuppressForbidden(reason = "Channel is based of a socket not a file")
183-
@Override
184-
public int read(ByteBuffer dst) throws IOException {
185-
try {
186-
return SocketAccess.doPrivilegedIOException(() -> readChannel.read(dst));
187-
} catch (StorageException e) {
188-
if (e.getCode() == HTTP_NOT_FOUND) {
189-
throw new NoSuchFileException("Blob [" + blobName + "] does not exist");
190-
}
191-
throw e;
192-
}
193-
}
194-
195-
@Override
196-
public boolean isOpen() {
197-
return readChannel.isOpen();
198-
}
199-
200-
@Override
201-
public void close() throws IOException {
202-
SocketAccess.doPrivilegedVoidIOException(readChannel::close);
203-
}
204-
});
174+
return new GoogleCloudStorageRetryingInputStream(client(), BlobId.of(bucketName, blobName));
205175
}
206176

207177
/**
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,179 @@
1+
/*
2+
* Licensed to Elasticsearch under one or more contributor
3+
* license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright
5+
* ownership. Elasticsearch licenses this file to you under
6+
* the Apache License, Version 2.0 (the "License"); you may
7+
* not use this file except in compliance with the License.
8+
* You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
package org.elasticsearch.repositories.gcs;
20+
21+
import com.google.cloud.ReadChannel;
22+
import com.google.cloud.storage.BlobId;
23+
import com.google.cloud.storage.Storage;
24+
import com.google.cloud.storage.StorageException;
25+
import org.apache.logging.log4j.LogManager;
26+
import org.apache.logging.log4j.Logger;
27+
import org.apache.logging.log4j.message.ParameterizedMessage;
28+
import org.elasticsearch.common.SuppressForbidden;
29+
import org.elasticsearch.core.internal.io.IOUtils;
30+
31+
import java.io.IOException;
32+
import java.io.InputStream;
33+
import java.nio.ByteBuffer;
34+
import java.nio.channels.Channels;
35+
import java.nio.channels.ReadableByteChannel;
36+
import java.nio.file.NoSuchFileException;
37+
import java.util.ArrayList;
38+
import java.util.List;
39+
40+
import static java.net.HttpURLConnection.HTTP_NOT_FOUND;
41+
42+
/**
43+
* Wrapper around reads from GCS that will retry blob downloads that fail part-way through, resuming from where the failure occurred.
44+
* This should be handled by the SDK but it isn't today. This should be revisited in the future (e.g. before removing
45+
* the {@link org.elasticsearch.Version#V_7_0_0} version constant) and removed if the SDK handles retries itself in the future.
46+
*/
47+
class GoogleCloudStorageRetryingInputStream extends InputStream {
48+
49+
private static final Logger logger = LogManager.getLogger(GoogleCloudStorageRetryingInputStream.class);
50+
51+
static final int MAX_SUPPRESSED_EXCEPTIONS = 10;
52+
53+
private final Storage client;
54+
55+
private final BlobId blobId;
56+
57+
private final int maxRetries;
58+
59+
private InputStream currentStream;
60+
private int attempt = 1;
61+
private List<StorageException> failures = new ArrayList<>(MAX_SUPPRESSED_EXCEPTIONS);
62+
private long currentOffset;
63+
private boolean closed;
64+
65+
GoogleCloudStorageRetryingInputStream(Storage client, BlobId blobId) throws IOException {
66+
this.client = client;
67+
this.blobId = blobId;
68+
this.maxRetries = client.getOptions().getRetrySettings().getMaxAttempts() + 1;
69+
currentStream = openStream();
70+
}
71+
72+
private InputStream openStream() throws IOException {
73+
try {
74+
final ReadChannel readChannel = SocketAccess.doPrivilegedIOException(() -> client.reader(blobId));
75+
readChannel.seek(currentOffset);
76+
return Channels.newInputStream(new ReadableByteChannel() {
77+
@SuppressForbidden(reason = "Channel is based of a socket not a file")
78+
@Override
79+
public int read(ByteBuffer dst) throws IOException {
80+
try {
81+
return SocketAccess.doPrivilegedIOException(() -> readChannel.read(dst));
82+
} catch (StorageException e) {
83+
if (e.getCode() == HTTP_NOT_FOUND) {
84+
throw new NoSuchFileException("Blob [" + blobId.getName() + "] does not exist");
85+
}
86+
throw e;
87+
}
88+
}
89+
90+
@Override
91+
public boolean isOpen() {
92+
return readChannel.isOpen();
93+
}
94+
95+
@Override
96+
public void close() throws IOException {
97+
SocketAccess.doPrivilegedVoidIOException(readChannel::close);
98+
}
99+
});
100+
} catch (StorageException e) {
101+
throw addSuppressedExceptions(e);
102+
}
103+
}
104+
105+
@Override
106+
public int read() throws IOException {
107+
ensureOpen();
108+
while (true) {
109+
try {
110+
final int result = currentStream.read();
111+
currentOffset += 1;
112+
return result;
113+
} catch (StorageException e) {
114+
reopenStreamOrFail(e);
115+
}
116+
}
117+
}
118+
119+
@Override
120+
public int read(byte[] b, int off, int len) throws IOException {
121+
ensureOpen();
122+
while (true) {
123+
try {
124+
final int bytesRead = currentStream.read(b, off, len);
125+
if (bytesRead == -1) {
126+
return -1;
127+
}
128+
currentOffset += bytesRead;
129+
return bytesRead;
130+
} catch (StorageException e) {
131+
reopenStreamOrFail(e);
132+
}
133+
}
134+
}
135+
136+
private void ensureOpen() {
137+
if (closed) {
138+
assert false : "using GoogleCloudStorageRetryingInputStream after close";
139+
throw new IllegalStateException("using GoogleCloudStorageRetryingInputStream after close");
140+
}
141+
}
142+
143+
private void reopenStreamOrFail(StorageException e) throws IOException {
144+
if (attempt >= maxRetries) {
145+
throw addSuppressedExceptions(e);
146+
}
147+
logger.debug(new ParameterizedMessage("failed reading [{}] at offset [{}], attempt [{}] of [{}], retrying",
148+
blobId, currentOffset, attempt, MAX_SUPPRESSED_EXCEPTIONS), e);
149+
attempt += 1;
150+
if (failures.size() < MAX_SUPPRESSED_EXCEPTIONS) {
151+
failures.add(e);
152+
}
153+
IOUtils.closeWhileHandlingException(currentStream);
154+
currentStream = openStream();
155+
}
156+
157+
@Override
158+
public void close() throws IOException {
159+
currentStream.close();
160+
closed = true;
161+
}
162+
163+
@Override
164+
public long skip(long n) {
165+
throw new UnsupportedOperationException("GoogleCloudStorageRetryingInputStream does not support seeking");
166+
}
167+
168+
@Override
169+
public void reset() {
170+
throw new UnsupportedOperationException("GoogleCloudStorageRetryingInputStream does not support seeking");
171+
}
172+
173+
private <T extends Exception> T addSuppressedExceptions(T e) {
174+
for (StorageException failure : failures) {
175+
e.addSuppressed(failure);
176+
}
177+
return e;
178+
}
179+
}

plugins/repository-gcs/src/test/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageBlobContainerRetriesTests.java

+29
Original file line numberDiff line numberDiff line change
@@ -200,6 +200,35 @@ public void testReadBlobWithRetries() throws Exception {
200200
}
201201
}
202202

203+
public void testReadLargeBlobWithRetries() throws Exception {
204+
final int maxRetries = randomIntBetween(2, 10);
205+
final CountDown countDown = new CountDown(maxRetries);
206+
207+
// SDK reads in 2 MB chunks so we use twice that to simulate 2 chunks
208+
final byte[] bytes = randomBytes(1 << 22);
209+
httpServer.createContext("/download/storage/v1/b/bucket/o/large_blob_retries", exchange -> {
210+
Streams.readFully(exchange.getRequestBody());
211+
exchange.getResponseHeaders().add("Content-Type", "application/octet-stream");
212+
final String[] range = exchange.getRequestHeaders().get("Range").get(0).substring("bytes=".length()).split("-");
213+
final int offset = Integer.parseInt(range[0]);
214+
final int end = Integer.parseInt(range[1]);
215+
final byte[] chunk = Arrays.copyOfRange(bytes, offset, Math.min(end + 1, bytes.length));
216+
exchange.sendResponseHeaders(RestStatus.OK.getStatus(), chunk.length);
217+
if (randomBoolean() && countDown.countDown() == false) {
218+
exchange.getResponseBody().write(chunk, 0, chunk.length - 1);
219+
exchange.close();
220+
return;
221+
}
222+
exchange.getResponseBody().write(chunk);
223+
exchange.close();
224+
});
225+
226+
final BlobContainer blobContainer = createBlobContainer(maxRetries, null);
227+
try (InputStream inputStream = blobContainer.readBlob("large_blob_retries")) {
228+
assertArrayEquals(bytes, BytesReference.toBytes(Streams.readFully(inputStream)));
229+
}
230+
}
231+
203232
public void testReadBlobWithReadTimeouts() {
204233
final int maxRetries = randomIntBetween(1, 3);
205234
final BlobContainer blobContainer = createBlobContainer(maxRetries, TimeValue.timeValueMillis(between(100, 200)));

0 commit comments

Comments
 (0)