Skip to content

Merge AzureStorageService and AzureStorageServiceImpl and clean up tests #31607

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Jun 29, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ public class AzureRepositoryPlugin extends Plugin implements RepositoryPlugin, R

public AzureRepositoryPlugin(Settings settings) {
// eagerly load client settings so that secure settings are read
this.azureStoreService = new AzureStorageServiceImpl(settings);
this.azureStoreService = new AzureStorageService(settings);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,27 +19,59 @@

package org.elasticsearch.repositories.azure;

import com.microsoft.azure.storage.AccessCondition;
import com.microsoft.azure.storage.CloudStorageAccount;
import com.microsoft.azure.storage.OperationContext;
import com.microsoft.azure.storage.RetryExponentialRetry;
import com.microsoft.azure.storage.RetryPolicy;
import com.microsoft.azure.storage.StorageErrorCodeStrings;
import com.microsoft.azure.storage.StorageException;
import com.microsoft.azure.storage.blob.BlobInputStream;
import com.microsoft.azure.storage.blob.BlobListingDetails;
import com.microsoft.azure.storage.blob.BlobProperties;
import com.microsoft.azure.storage.blob.CloudBlobClient;

import com.microsoft.azure.storage.blob.CloudBlobContainer;
import com.microsoft.azure.storage.blob.CloudBlockBlob;
import com.microsoft.azure.storage.blob.DeleteSnapshotsOption;
import com.microsoft.azure.storage.blob.ListBlobItem;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.elasticsearch.common.blobstore.BlobMetaData;
import org.elasticsearch.common.blobstore.support.PlainBlobMetaData;
import org.elasticsearch.common.collect.MapBuilder;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.settings.SettingsException;
import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.unit.ByteSizeValue;

import java.io.IOException;
import java.io.InputStream;
import java.net.HttpURLConnection;
import java.net.URI;
import java.net.URISyntaxException;
import java.nio.file.FileAlreadyExistsException;
import java.security.InvalidKeyException;
import java.util.EnumSet;
import java.util.Map;
import java.util.function.Supplier;

/**
* Azure Storage Service interface
* @see AzureStorageServiceImpl for Azure REST API implementation
*/
public interface AzureStorageService {
import static java.util.Collections.emptyMap;

public class AzureStorageService extends AbstractComponent {

public static final ByteSizeValue MIN_CHUNK_SIZE = new ByteSizeValue(1, ByteSizeUnit.BYTES);
public static final ByteSizeValue MAX_CHUNK_SIZE = new ByteSizeValue(64, ByteSizeUnit.MB);

// 'package' for testing
volatile Map<String, AzureStorageSettings> storageSettings = emptyMap();

public AzureStorageService(Settings settings) {
super(settings);
// eagerly load client settings so that secure settings are read
final Map<String, AzureStorageSettings> clientsSettings = AzureStorageSettings.load(settings);
refreshAndClearCache(clientsSettings);
}

/**
* Creates a {@code CloudBlobClient} on each invocation using the current client
Expand All @@ -48,7 +80,46 @@ public interface AzureStorageService {
* thread for logically coupled ops. The {@code OperationContext} is used to
* specify the proxy, but a new context is *required* for each call.
*/
Tuple<CloudBlobClient, Supplier<OperationContext>> client(String clientName);
public Tuple<CloudBlobClient, Supplier<OperationContext>> client(String clientName) {
final AzureStorageSettings azureStorageSettings = this.storageSettings.get(clientName);
if (azureStorageSettings == null) {
throw new SettingsException("Unable to find client with name [" + clientName + "]");
}
try {
return new Tuple<>(buildClient(azureStorageSettings), () -> buildOperationContext(azureStorageSettings));
} catch (InvalidKeyException | URISyntaxException | IllegalArgumentException e) {
throw new SettingsException("Invalid azure client settings with name [" + clientName + "]", e);
}
}

protected CloudBlobClient buildClient(AzureStorageSettings azureStorageSettings) throws InvalidKeyException, URISyntaxException {
final CloudBlobClient client = createClient(azureStorageSettings);
// Set timeout option if the user sets cloud.azure.storage.timeout or
// cloud.azure.storage.xxx.timeout (it's negative by default)
final long timeout = azureStorageSettings.getTimeout().getMillis();
if (timeout > 0) {
if (timeout > Integer.MAX_VALUE) {
throw new IllegalArgumentException("Timeout [" + azureStorageSettings.getTimeout() + "] exceeds 2,147,483,647ms.");
}
client.getDefaultRequestOptions().setTimeoutIntervalInMs((int) timeout);
}
// We define a default exponential retry policy
client.getDefaultRequestOptions()
.setRetryPolicyFactory(new RetryExponentialRetry(RetryPolicy.DEFAULT_CLIENT_BACKOFF, azureStorageSettings.getMaxRetries()));
client.getDefaultRequestOptions().setLocationMode(azureStorageSettings.getLocationMode());
return client;
}

protected CloudBlobClient createClient(AzureStorageSettings azureStorageSettings) throws InvalidKeyException, URISyntaxException {
final String connectionString = azureStorageSettings.buildConnectionString();
return CloudStorageAccount.parse(connectionString).createCloudBlobClient();
}

protected OperationContext buildOperationContext(AzureStorageSettings azureStorageSettings) {
final OperationContext context = new OperationContext();
context.setProxy(azureStorageSettings.getProxy());
return context;
}

/**
* Updates settings for building clients. Any client cache is cleared. Future
Expand All @@ -57,32 +128,134 @@ public interface AzureStorageService {
* @param clientsSettings the settings for new clients
* @return the old settings
*/
Map<String, AzureStorageSettings> refreshAndClearCache(Map<String, AzureStorageSettings> clientsSettings);

ByteSizeValue MIN_CHUNK_SIZE = new ByteSizeValue(1, ByteSizeUnit.BYTES);
ByteSizeValue MAX_CHUNK_SIZE = new ByteSizeValue(64, ByteSizeUnit.MB);

boolean doesContainerExist(String account, String container) throws URISyntaxException, StorageException;
public Map<String, AzureStorageSettings> refreshAndClearCache(Map<String, AzureStorageSettings> clientsSettings) {
final Map<String, AzureStorageSettings> prevSettings = this.storageSettings;
this.storageSettings = MapBuilder.newMapBuilder(clientsSettings).immutableMap();
// clients are built lazily by {@link client(String)}
return prevSettings;
}

void removeContainer(String account, String container) throws URISyntaxException, StorageException;
public boolean doesContainerExist(String account, String container) throws URISyntaxException, StorageException {
final Tuple<CloudBlobClient, Supplier<OperationContext>> client = client(account);
final CloudBlobContainer blobContainer = client.v1().getContainerReference(container);
return SocketAccess.doPrivilegedException(() -> blobContainer.exists(null, null, client.v2().get()));
}

void createContainer(String account, String container) throws URISyntaxException, StorageException;
public void deleteFiles(String account, String container, String path) throws URISyntaxException, StorageException {
final Tuple<CloudBlobClient, Supplier<OperationContext>> client = client(account);
// container name must be lower case.
logger.trace(() -> new ParameterizedMessage("delete files container [{}], path [{}]", container, path));
SocketAccess.doPrivilegedVoidException(() -> {
// list the blobs using a flat blob listing mode
final CloudBlobContainer blobContainer = client.v1().getContainerReference(container);
for (final ListBlobItem blobItem : blobContainer.listBlobs(path, true, EnumSet.noneOf(BlobListingDetails.class), null,
client.v2().get())) {
final String blobName = blobNameFromUri(blobItem.getUri());
logger.trace(() -> new ParameterizedMessage("removing blob [{}] full URI was [{}]", blobName, blobItem.getUri()));
// don't call {@code #deleteBlob}, use the same client
final CloudBlockBlob azureBlob = blobContainer.getBlockBlobReference(blobName);
azureBlob.delete(DeleteSnapshotsOption.NONE, null, null, client.v2().get());
}
});
}

void deleteFiles(String account, String container, String path) throws URISyntaxException, StorageException;
/**
* Extract the blob name from a URI like https://myservice.azure.net/container/path/to/myfile
* It should remove the container part (first part of the path) and gives path/to/myfile
* @param uri URI to parse
* @return The blob name relative to the container
*/
static String blobNameFromUri(URI uri) {
final String path = uri.getPath();
// We remove the container name from the path
// The 3 magic number cames from the fact if path is /container/path/to/myfile
// First occurrence is empty "/"
// Second occurrence is "container
// Last part contains "path/to/myfile" which is what we want to get
final String[] splits = path.split("/", 3);
// We return the remaining end of the string
return splits[2];
}

boolean blobExists(String account, String container, String blob) throws URISyntaxException, StorageException;
public boolean blobExists(String account, String container, String blob) throws URISyntaxException, StorageException {
// Container name must be lower case.
final Tuple<CloudBlobClient, Supplier<OperationContext>> client = client(account);
final CloudBlobContainer blobContainer = client.v1().getContainerReference(container);
return SocketAccess.doPrivilegedException(() -> {
final CloudBlockBlob azureBlob = blobContainer.getBlockBlobReference(blob);
return azureBlob.exists(null, null, client.v2().get());
});
}

void deleteBlob(String account, String container, String blob) throws URISyntaxException, StorageException;
public void deleteBlob(String account, String container, String blob) throws URISyntaxException, StorageException {
final Tuple<CloudBlobClient, Supplier<OperationContext>> client = client(account);
// Container name must be lower case.
final CloudBlobContainer blobContainer = client.v1().getContainerReference(container);
logger.trace(() -> new ParameterizedMessage("delete blob for container [{}], blob [{}]", container, blob));
SocketAccess.doPrivilegedVoidException(() -> {
final CloudBlockBlob azureBlob = blobContainer.getBlockBlobReference(blob);
logger.trace(() -> new ParameterizedMessage("container [{}]: blob [{}] found. removing.", container, blob));
azureBlob.delete(DeleteSnapshotsOption.NONE, null, null, client.v2().get());
});
}

InputStream getInputStream(String account, String container, String blob) throws URISyntaxException, StorageException, IOException;
public InputStream getInputStream(String account, String container, String blob)
throws URISyntaxException, StorageException, IOException {
final Tuple<CloudBlobClient, Supplier<OperationContext>> client = client(account);
final CloudBlockBlob blockBlobReference = client.v1().getContainerReference(container).getBlockBlobReference(blob);
logger.trace(() -> new ParameterizedMessage("reading container [{}], blob [{}]", container, blob));
final BlobInputStream is = SocketAccess.doPrivilegedException(() ->
blockBlobReference.openInputStream(null, null, client.v2().get()));
return giveSocketPermissionsToStream(is);
}

Map<String, BlobMetaData> listBlobsByPrefix(String account, String container, String keyPath, String prefix)
throws URISyntaxException, StorageException;
public Map<String, BlobMetaData> listBlobsByPrefix(String account, String container, String keyPath, String prefix)
throws URISyntaxException, StorageException {
// NOTE: this should be here: if (prefix == null) prefix = "";
// however, this is really inefficient since deleteBlobsByPrefix enumerates everything and
// then does a prefix match on the result; it should just call listBlobsByPrefix with the prefix!
final MapBuilder<String, BlobMetaData> blobsBuilder = MapBuilder.newMapBuilder();
final EnumSet<BlobListingDetails> enumBlobListingDetails = EnumSet.of(BlobListingDetails.METADATA);
final Tuple<CloudBlobClient, Supplier<OperationContext>> client = client(account);
final CloudBlobContainer blobContainer = client.v1().getContainerReference(container);
logger.trace(() -> new ParameterizedMessage("listing container [{}], keyPath [{}], prefix [{}]", container, keyPath, prefix));
SocketAccess.doPrivilegedVoidException(() -> {
for (final ListBlobItem blobItem : blobContainer.listBlobs(keyPath + (prefix == null ? "" : prefix), false,
enumBlobListingDetails, null, client.v2().get())) {
final URI uri = blobItem.getUri();
logger.trace(() -> new ParameterizedMessage("blob url [{}]", uri));
// uri.getPath is of the form /container/keyPath.* and we want to strip off the /container/
// this requires 1 + container.length() + 1, with each 1 corresponding to one of the /
final String blobPath = uri.getPath().substring(1 + container.length() + 1);
final BlobProperties properties = ((CloudBlockBlob) blobItem).getProperties();
final String name = blobPath.substring(keyPath.length());
logger.trace(() -> new ParameterizedMessage("blob url [{}], name [{}], size [{}]", uri, name, properties.getLength()));
blobsBuilder.put(name, new PlainBlobMetaData(name, properties.getLength()));
}
});
return blobsBuilder.immutableMap();
}

void writeBlob(String account, String container, String blobName, InputStream inputStream, long blobSize)
throws URISyntaxException, StorageException, FileAlreadyExistsException;
public void writeBlob(String account, String container, String blobName, InputStream inputStream, long blobSize)
throws URISyntaxException, StorageException, FileAlreadyExistsException {
logger.trace(() -> new ParameterizedMessage("writeBlob({}, stream, {})", blobName, blobSize));
final Tuple<CloudBlobClient, Supplier<OperationContext>> client = client(account);
final CloudBlobContainer blobContainer = client.v1().getContainerReference(container);
final CloudBlockBlob blob = blobContainer.getBlockBlobReference(blobName);
try {
SocketAccess.doPrivilegedVoidException(() ->
blob.upload(inputStream, blobSize, AccessCondition.generateIfNotExistsCondition(), null, client.v2().get()));
} catch (final StorageException se) {
if (se.getHttpStatusCode() == HttpURLConnection.HTTP_CONFLICT &&
StorageErrorCodeStrings.BLOB_ALREADY_EXISTS.equals(se.getErrorCode())) {
throw new FileAlreadyExistsException(blobName, null, se.getMessage());
}
throw se;
}
logger.trace(() -> new ParameterizedMessage("writeBlob({}, stream, {}) - done", blobName, blobSize));
}

static InputStream giveSocketPermissionsToStream(InputStream stream) {
static InputStream giveSocketPermissionsToStream(final InputStream stream) {
return new InputStream() {
@Override
public int read() throws IOException {
Expand Down
Loading