Skip to content

Increase Azure client timeout on tests #67210

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 5 commits into from
Jan 13, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,6 @@
import java.util.regex.Pattern;

import static org.hamcrest.Matchers.anEmptyMap;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.is;

@SuppressForbidden(reason = "this test uses a HttpServer to emulate an Azure endpoint")
Expand Down Expand Up @@ -124,8 +123,8 @@ AzureStorageService createAzureStorageService(Settings settings, AzureClientProv
@Override
RequestRetryOptions getRetryOptions(LocationMode locationMode, AzureStorageSettings azureStorageSettings) {
return new RequestRetryOptions(RetryPolicyType.EXPONENTIAL,
azureStorageSettings.getMaxRetries() + 1, 5,
1L, 15L, null);
azureStorageSettings.getMaxRetries() + 1, 60,
50L, 100L, null);
}

@Override
Expand Down Expand Up @@ -222,7 +221,6 @@ private boolean isPutBlockList(String request) {
}
}

@AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/67119")
public void testLargeBlobCountDeletion() throws Exception {
int numberOfBlobs = randomIntBetween(257, 2000);
try (BlobStore store = newBlobStore()) {
Expand All @@ -234,7 +232,7 @@ public void testLargeBlobCountDeletion() throws Exception {
}

container.delete();
assertThat(container.listBlobs().size(), equalTo(0));
assertThat(container.listBlobs(), is(anEmptyMap()));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.BiConsumer;
import java.util.function.BiPredicate;
import java.util.function.Supplier;

public class AzureBlobStore implements BlobStore {
private static final Logger logger = LogManager.getLogger(AzureBlobStore.class);
Expand Down Expand Up @@ -227,40 +228,41 @@ public DeleteResult deleteBlobDirectory(String path) throws IOException {
final AtomicInteger blobsDeleted = new AtomicInteger(0);
final AtomicLong bytesDeleted = new AtomicLong(0);

try {
final BlobServiceClient client = client();
SocketAccess.doPrivilegedVoidException(() -> {
final BlobContainerClient blobContainerClient = client.getBlobContainerClient(container);
final BlobContainerAsyncClient blobContainerAsyncClient = asyncClient().getBlobContainerAsyncClient(container);
final Queue<String> directories = new ArrayDeque<>();
directories.offer(path);
String directoryName;
List<Mono<Void>> deleteTasks = new ArrayList<>();
while ((directoryName = directories.poll()) != null) {
final BlobListDetails blobListDetails = new BlobListDetails()
.setRetrieveMetadata(true);

final ListBlobsOptions options = new ListBlobsOptions()
.setPrefix(directoryName)
.setDetails(blobListDetails);

for (BlobItem blobItem : blobContainerClient.listBlobsByHierarchy("/", options, null)) {
if (blobItem.isPrefix() != null && blobItem.isPrefix()) {
directories.offer(blobItem.getName());
} else {
BlobAsyncClient blobAsyncClient = blobContainerAsyncClient.getBlobAsyncClient(blobItem.getName());
deleteTasks.add(blobAsyncClient.delete());
bytesDeleted.addAndGet(blobItem.getProperties().getContentLength());
blobsDeleted.incrementAndGet();
}
final BlobServiceClient client = client();
SocketAccess.doPrivilegedVoidException(() -> {
final BlobContainerClient blobContainerClient = client.getBlobContainerClient(container);
final BlobContainerAsyncClient blobContainerAsyncClient = asyncClient().getBlobContainerAsyncClient(container);
final Queue<String> directories = new ArrayDeque<>();
directories.offer(path);
String directoryName;
List<Mono<Void>> deleteTasks = new ArrayList<>();
while ((directoryName = directories.poll()) != null) {
final BlobListDetails blobListDetails = new BlobListDetails()
.setRetrieveMetadata(true);

final ListBlobsOptions options = new ListBlobsOptions()
.setPrefix(directoryName)
.setDetails(blobListDetails);

for (BlobItem blobItem : blobContainerClient.listBlobsByHierarchy("/", options, null)) {
if (blobItem.isPrefix() != null && blobItem.isPrefix()) {
directories.offer(blobItem.getName());
} else {
BlobAsyncClient blobAsyncClient = blobContainerAsyncClient.getBlobAsyncClient(blobItem.getName());
final Mono<Void> deleteTask = blobAsyncClient.delete()
// Ignore not found blobs, as it's possible that due to network errors a request
// for an already deleted blob is retried, causing an error.
.onErrorResume(this::isNotFoundError, throwable -> Mono.empty())
.onErrorMap(throwable -> new IOException("Error deleting blob " + blobItem.getName(), throwable));
deleteTasks.add(deleteTask);
bytesDeleted.addAndGet(blobItem.getProperties().getContentLength());
blobsDeleted.incrementAndGet();
}
}
}

executeDeleteTasks(deleteTasks);
});
} catch (Exception e) {
throw new IOException("Deleting directory [" + path + "] failed", e);
}
executeDeleteTasks(deleteTasks, () -> "Deleting directory [" + path + "] failed");
});

return new DeleteResult(blobsDeleted.get(), bytesDeleted.get());
}
Expand All @@ -270,31 +272,43 @@ void deleteBlobList(List<String> blobs) throws IOException {
return;
}

try {
BlobServiceAsyncClient asyncClient = asyncClient();
SocketAccess.doPrivilegedVoidException(() -> {
List<Mono<Void>> deleteTasks = new ArrayList<>(blobs.size());
final BlobContainerAsyncClient blobContainerClient = asyncClient.getBlobContainerAsyncClient(container);
for (String blob : blobs) {
final Mono<Void> deleteTask = blobContainerClient.getBlobAsyncClient(blob)
.delete()
// Ignore not found blobs
.onErrorResume(e -> (e instanceof BlobStorageException) && ((BlobStorageException) e).getStatusCode() == 404,
throwable -> Mono.empty());
deleteTasks.add(deleteTask);
}
BlobServiceAsyncClient asyncClient = asyncClient();
SocketAccess.doPrivilegedVoidException(() -> {
List<Mono<Void>> deleteTasks = new ArrayList<>(blobs.size());
final BlobContainerAsyncClient blobContainerClient = asyncClient.getBlobContainerAsyncClient(container);
for (String blob : blobs) {
final Mono<Void> deleteTask = blobContainerClient.getBlobAsyncClient(blob)
.delete()
// Ignore not found blobs
.onErrorResume(this::isNotFoundError, throwable -> Mono.empty())
.onErrorMap(throwable -> new IOException("Error deleting blob " + blob, throwable));

deleteTasks.add(deleteTask);
}

executeDeleteTasks(deleteTasks);
});
} catch (Exception e) {
throw new IOException("Unable to delete blobs " + blobs, e);
}
executeDeleteTasks(deleteTasks, () -> "Unable to delete blobs " + blobs);
});
}

private void executeDeleteTasks(List<Mono<Void>> deleteTasks) {
// zipDelayError executes all tasks in parallel and delays
// error propagation until all tasks have finished.
Mono.zipDelayError(deleteTasks, results -> null).block();
private boolean isNotFoundError(Throwable e) {
return e instanceof BlobStorageException && ((BlobStorageException) e).getStatusCode() == 404;
}

private void executeDeleteTasks(List<Mono<Void>> deleteTasks, Supplier<String> errorMessageSupplier) throws IOException {
try {
// zipDelayError executes all tasks in parallel and delays
// error propagation until all tasks have finished.
Mono.zipDelayError(deleteTasks, results -> null).block();
} catch (Exception e) {
final IOException exception = new IOException(errorMessageSupplier.get());
for (Throwable suppressed : e.getSuppressed()) {
// We're only interested about the blob deletion exceptions and not in the reactor internals exceptions
if (suppressed instanceof IOException) {
exception.addSuppressed(suppressed);
}
}
throw exception;
}
}

public InputStream getInputStream(String blob, long position, final @Nullable Long length) throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,7 @@ private BlobContainer createBlobContainer(final int maxRetries, String secondary
RequestRetryOptions getRetryOptions(LocationMode locationMode, AzureStorageSettings azureStorageSettings) {
return new RequestRetryOptions(RetryPolicyType.EXPONENTIAL,
maxRetries + 1,
1,
60,
50L,
100L,
// The SDK doesn't work well with ip endponts. Secondary host endpoints that contain
Expand Down