Skip to content

Commit 58c62c5

Browse files
Fix Bug in Azure Repo Exception Handling (#47968)
We were incorrectly handling `IOExceptions` thrown by the `InputStream` side of the upload operation, resulting in a `ClassCastException` as we expected to never get `IOException` from the Azure SDK code but we do in practice. This PR also sets an assertion on `markSupported` for the streams used by the SDK as adding the test for this scenario revealed that the SDK client would retry uploads for non-mark-supporting streams on `IOException` in the `InputStream`.
1 parent a4df071 commit 58c62c5

File tree

5 files changed

+59
-20
lines changed

5 files changed

+59
-20
lines changed

plugins/repository-azure/src/main/java/org/elasticsearch/repositories/azure/AzureBlobContainer.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -62,7 +62,7 @@ private boolean blobExists(String blobName) {
6262
logger.trace("blobExists({})", blobName);
6363
try {
6464
return blobStore.blobExists(buildKey(blobName));
65-
} catch (URISyntaxException | StorageException e) {
65+
} catch (URISyntaxException | StorageException | IOException e) {
6666
logger.warn("can not access [{}] in container {{}}: {}", blobName, blobStore, e.getMessage());
6767
}
6868
return false;
@@ -97,7 +97,6 @@ public InputStream readBlob(String blobName) throws IOException {
9797
@Override
9898
public void writeBlob(String blobName, InputStream inputStream, long blobSize, boolean failIfAlreadyExists) throws IOException {
9999
logger.trace("writeBlob({}, stream, {})", buildKey(blobName), blobSize);
100-
101100
try {
102101
blobStore.writeBlob(buildKey(blobName), inputStream, blobSize, failIfAlreadyExists);
103102
} catch (URISyntaxException|StorageException e) {

plugins/repository-azure/src/main/java/org/elasticsearch/repositories/azure/AzureBlobStore.java

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,6 @@
3333
import java.io.IOException;
3434
import java.io.InputStream;
3535
import java.net.URISyntaxException;
36-
import java.nio.file.FileAlreadyExistsException;
3736
import java.util.Collections;
3837
import java.util.Map;
3938
import java.util.concurrent.Executor;
@@ -88,11 +87,11 @@ public BlobContainer blobContainer(BlobPath path) {
8887
public void close() {
8988
}
9089

91-
public boolean blobExists(String blob) throws URISyntaxException, StorageException {
90+
public boolean blobExists(String blob) throws URISyntaxException, StorageException, IOException {
9291
return service.blobExists(clientName, container, blob);
9392
}
9493

95-
public void deleteBlob(String blob) throws URISyntaxException, StorageException {
94+
public void deleteBlob(String blob) throws URISyntaxException, StorageException, IOException {
9695
service.deleteBlob(clientName, container, blob);
9796
}
9897

@@ -106,17 +105,17 @@ public InputStream getInputStream(String blob) throws URISyntaxException, Storag
106105
}
107106

108107
public Map<String, BlobMetaData> listBlobsByPrefix(String keyPath, String prefix)
109-
throws URISyntaxException, StorageException {
108+
throws URISyntaxException, StorageException, IOException {
110109
return service.listBlobsByPrefix(clientName, container, keyPath, prefix);
111110
}
112111

113-
public Map<String, BlobContainer> children(BlobPath path) throws URISyntaxException, StorageException {
112+
public Map<String, BlobContainer> children(BlobPath path) throws URISyntaxException, StorageException, IOException {
114113
return Collections.unmodifiableMap(service.children(clientName, container, path).stream().collect(
115114
Collectors.toMap(Function.identity(), name -> new AzureBlobContainer(path.add(name), this, threadPool))));
116115
}
117116

118117
public void writeBlob(String blobName, InputStream inputStream, long blobSize, boolean failIfAlreadyExists)
119-
throws URISyntaxException, StorageException, FileAlreadyExistsException {
118+
throws URISyntaxException, StorageException, IOException {
120119
service.writeBlob(this.clientName, container, blobName, inputStream, blobSize, failIfAlreadyExists);
121120
}
122121
}

plugins/repository-azure/src/main/java/org/elasticsearch/repositories/azure/AzureStorageService.java

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -267,7 +267,7 @@ public InputStream getInputStream(String account, String container, String blob)
267267
}
268268

269269
public Map<String, BlobMetaData> listBlobsByPrefix(String account, String container, String keyPath, String prefix)
270-
throws URISyntaxException, StorageException {
270+
throws URISyntaxException, StorageException, IOException {
271271
// NOTE: this should be here: if (prefix == null) prefix = "";
272272
// however, this is really inefficient since deleteBlobsByPrefix enumerates everything and
273273
// then does a prefix match on the result; it should just call listBlobsByPrefix with the prefix!
@@ -295,7 +295,7 @@ public Map<String, BlobMetaData> listBlobsByPrefix(String account, String contai
295295
return Map.copyOf(blobsBuilder);
296296
}
297297

298-
public Set<String> children(String account, String container, BlobPath path) throws URISyntaxException, StorageException {
298+
public Set<String> children(String account, String container, BlobPath path) throws URISyntaxException, StorageException, IOException {
299299
final var blobsBuilder = new HashSet<String>();
300300
final Tuple<CloudBlobClient, Supplier<OperationContext>> client = client(account);
301301
final CloudBlobContainer blobContainer = client.v1().getContainerReference(container);
@@ -319,8 +319,9 @@ public Set<String> children(String account, String container, BlobPath path) thr
319319
}
320320

321321
public void writeBlob(String account, String container, String blobName, InputStream inputStream, long blobSize,
322-
boolean failIfAlreadyExists)
323-
throws URISyntaxException, StorageException, FileAlreadyExistsException {
322+
boolean failIfAlreadyExists) throws URISyntaxException, StorageException, IOException {
323+
assert inputStream.markSupported()
324+
: "Should not be used with non-mark supporting streams as their retry handling in the SDK is broken";
324325
logger.trace(() -> new ParameterizedMessage("writeBlob({}, stream, {})", blobName, blobSize));
325326
final Tuple<CloudBlobClient, Supplier<OperationContext>> client = client(account);
326327
final CloudBlobContainer blobContainer = client.v1().getContainerReference(container);

plugins/repository-azure/src/main/java/org/elasticsearch/repositories/azure/SocketAccess.java

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
package org.elasticsearch.repositories.azure;
2121

2222
import com.microsoft.azure.storage.StorageException;
23+
import org.apache.logging.log4j.core.util.Throwables;
2324
import org.elasticsearch.SpecialPermission;
2425

2526
import java.io.IOException;
@@ -44,7 +45,9 @@ public static <T> T doPrivilegedIOException(PrivilegedExceptionAction<T> operati
4445
try {
4546
return AccessController.doPrivileged(operation);
4647
} catch (PrivilegedActionException e) {
47-
throw (IOException) e.getCause();
48+
Throwables.rethrow(e.getCause());
49+
assert false : "always throws";
50+
return null;
4851
}
4952
}
5053

@@ -53,7 +56,9 @@ public static <T> T doPrivilegedException(PrivilegedExceptionAction<T> operation
5356
try {
5457
return AccessController.doPrivileged(operation);
5558
} catch (PrivilegedActionException e) {
56-
throw (StorageException) e.getCause();
59+
Throwables.rethrow(e.getCause());
60+
assert false : "always throws";
61+
return null;
5762
}
5863
}
5964

@@ -65,12 +70,7 @@ public static void doPrivilegedVoidException(StorageRunnable action) throws Stor
6570
return null;
6671
});
6772
} catch (PrivilegedActionException e) {
68-
Throwable cause = e.getCause();
69-
if (cause instanceof StorageException) {
70-
throw (StorageException) cause;
71-
} else {
72-
throw (URISyntaxException) cause;
73-
}
73+
Throwables.rethrow(e.getCause());
7474
}
7575
}
7676

plugins/repository-azure/src/test/java/org/elasticsearch/repositories/azure/AzureBlobContainerRetriesTests.java

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,7 @@
4949
import org.junit.Before;
5050

5151
import java.io.ByteArrayOutputStream;
52+
import java.io.IOException;
5253
import java.io.InputStream;
5354
import java.io.InputStreamReader;
5455
import java.net.InetAddress;
@@ -63,6 +64,7 @@
6364
import java.util.Objects;
6465
import java.util.concurrent.ConcurrentHashMap;
6566
import java.util.concurrent.TimeUnit;
67+
import java.util.concurrent.atomic.AtomicBoolean;
6668
import java.util.concurrent.atomic.AtomicInteger;
6769
import java.util.regex.Matcher;
6870
import java.util.regex.Pattern;
@@ -294,6 +296,44 @@ public void testWriteLargeBlob() throws Exception {
294296
assertThat(blocks.isEmpty(), is(true));
295297
}
296298

299+
public void testRetryUntilFail() throws IOException {
300+
final AtomicBoolean requestReceived = new AtomicBoolean(false);
301+
httpServer.createContext("/container/write_blob_max_retries", exchange -> {
302+
try {
303+
if (requestReceived.compareAndSet(false, true)) {
304+
throw new AssertionError("Should not receive two requests");
305+
} else {
306+
exchange.sendResponseHeaders(RestStatus.CREATED.getStatus(), -1);
307+
}
308+
} finally {
309+
exchange.close();
310+
}
311+
});
312+
313+
final BlobContainer blobContainer = createBlobContainer(randomIntBetween(2, 5));
314+
try (InputStream stream = new InputStream() {
315+
316+
@Override
317+
public int read() throws IOException {
318+
throw new IOException("foo");
319+
}
320+
321+
@Override
322+
public boolean markSupported() {
323+
return true;
324+
}
325+
326+
@Override
327+
public void reset() {
328+
throw new AssertionError("should not be called");
329+
}
330+
}) {
331+
final IOException ioe = expectThrows(IOException.class, () ->
332+
blobContainer.writeBlob("write_blob_max_retries", stream, randomIntBetween(1, 128), randomBoolean()));
333+
assertThat(ioe.getMessage(), is("foo"));
334+
}
335+
}
336+
297337
private static byte[] randomBlobContent() {
298338
return randomByteArrayOfLength(randomIntBetween(1, frequently() ? 512 : 1 << 20)); // rarely up to 1mb
299339
}

0 commit comments

Comments
 (0)