Skip to content

Enable Azure snapshot plugin to support taking snapshot into multiple storage accounts. #22709

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.elasticsearch.common.blobstore.BlobMetaData;
import org.elasticsearch.common.blobstore.BlobPath;
import org.elasticsearch.common.blobstore.BlobStore;
import org.elasticsearch.common.collect.MapBuilder;
import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.settings.Settings;

Expand All @@ -51,6 +52,44 @@ public class AzureBlobStore extends AbstractComponent implements BlobStore {
private final String container;
private final String repositoryName;

private String[] getAccounts() {
if (Strings.isNullOrEmpty(this.accountName ))
return new String[0];

return this.accountName.split(",");
}

private String getAccount(String blob) {
final String[] accounts = this.getAccounts();
if (accounts.length == 0) {
return null;
}
int hash = this.getAccountHash(blob, accounts.length);
return accounts[hash];
}

private int getAccountHash(String blob, int numberOfAccounts) {
int hash = this.hashCode(blob);
return Math.abs(hash % numberOfAccounts);
}

/**
* Returns a hash code for this blob name.
*
* @return a hash code value for this blob name.
*/
private int hashCode(String blob) {
if (Strings.isNullOrEmpty(blob))
return 0;

int hash = 0;
final char chars[] = blob.toCharArray();
for (char ch : chars) {
hash += ch;
}
return hash;
}

public AzureBlobStore(RepositoryMetaData metadata, Settings settings,
AzureStorageService client) throws URISyntaxException, StorageException {
super(settings);
Expand Down Expand Up @@ -84,8 +123,16 @@ public BlobContainer blobContainer(BlobPath path) {
@Override
public void delete(BlobPath path) {
String keyPath = path.buildAsString();
final String[] accounts = this.getAccounts();
try {
this.client.deleteFiles(this.accountName, this.locMode, container, keyPath);
if (accounts.length == 0) {
this.client.deleteFiles(null, this.locMode, container, keyPath);
}else{
for (String account : accounts)
{
this.client.deleteFiles(account, this.locMode, container, keyPath);
}
}
} catch (URISyntaxException | StorageException e) {
logger.warn("can not remove [{}] in container {{}}: {}", keyPath, container, e.getMessage());
}
Expand All @@ -97,17 +144,44 @@ public void close() {

public boolean doesContainerExist(String container)
{
return this.client.doesContainerExist(this.accountName, this.locMode, container);
final String[] accounts = this.getAccounts();
if (accounts.length == 0)
{
return this.client.doesContainerExist(null, this.locMode, container);
}else{
for (String account : accounts)
{
if (!this.client.doesContainerExist(account, this.locMode, container)) {
return false;
}
}
}

return true;
}

public void removeContainer(String container) throws URISyntaxException, StorageException
{
this.client.removeContainer(this.accountName, this.locMode, container);
final String[] accounts = this.getAccounts();
if (accounts.length == 0){
this.client.removeContainer(null, this.locMode, container);
}else{
for (String account : accounts) {
this.client.removeContainer(account, this.locMode, container);
}
}
}

public void createContainer(String container) throws URISyntaxException, StorageException
{
this.client.createContainer(this.accountName, this.locMode, container);
final String[] accounts = this.getAccounts();
if (accounts.length == 0) {
this.client.createContainer(null, this.locMode, container);
}else{
for (String account : accounts) {
this.client.createContainer(account, this.locMode, container);
}
}
}

public void deleteFiles(String container, String path) throws URISyntaxException, StorageException
Expand All @@ -117,31 +191,48 @@ public void deleteFiles(String container, String path) throws URISyntaxException

public boolean blobExists(String container, String blob) throws URISyntaxException, StorageException
{
return this.client.blobExists(this.accountName, this.locMode, container, blob);
String account = this.getAccount(blob);
return this.client.blobExists(account, this.locMode, container, blob);
}

public void deleteBlob(String container, String blob) throws URISyntaxException, StorageException
{
this.client.deleteBlob(this.accountName, this.locMode, container, blob);
String account = this.getAccount(blob);
this.client.deleteBlob(account, this.locMode, container, blob);
}

public InputStream getInputStream(String container, String blob) throws URISyntaxException, StorageException, IOException
{
return this.client.getInputStream(this.accountName, this.locMode, container, blob);
String account = this.getAccount(blob);
return this.client.getInputStream(account, this.locMode, container, blob);
}

public OutputStream getOutputStream(String container, String blob) throws URISyntaxException, StorageException
{
return this.client.getOutputStream(this.accountName, this.locMode, container, blob);
String account = this.getAccount(blob);
return this.client.getOutputStream(account, this.locMode, container, blob);
}

public Map<String,BlobMetaData> listBlobsByPrefix(String container, String keyPath, String prefix) throws URISyntaxException, StorageException
{
return this.client.listBlobsByPrefix(this.accountName, this.locMode, container, keyPath, prefix);
MapBuilder<String, BlobMetaData> blobsBuilder = MapBuilder.newMapBuilder();

final String[] accounts = this.getAccounts();
if (accounts.length == 0) {
return this.client.listBlobsByPrefix(null, this.locMode, container, keyPath, prefix);
} else {
for (String account : accounts) {
Map<String, BlobMetaData> blobs = this.client.listBlobsByPrefix(account, this.locMode, container, keyPath, prefix);
blobsBuilder.putAll(blobs);
}
}
return blobsBuilder.immutableMap();
}

public void moveBlob(String container, String sourceBlob, String targetBlob) throws URISyntaxException, StorageException
{
this.client.moveBlob(this.accountName, this.locMode, container, sourceBlob, targetBlob);
String sourceAccount = this.getAccount(sourceBlob);
String targetAccount = this.getAccount(targetBlob);
this.client.moveBlob(sourceAccount, targetAccount,this.locMode, container, sourceBlob, targetBlob);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,6 @@ OutputStream getOutputStream(String account, LocationMode mode, String container
Map<String,BlobMetaData> listBlobsByPrefix(String account, LocationMode mode, String container, String keyPath, String prefix)
throws URISyntaxException, StorageException;

void moveBlob(String account, LocationMode mode, String container, String sourceBlob, String targetBlob)
void moveBlob(String account, String targetAccount, LocationMode mode, String container, String sourceBlob, String targetBlob)
throws URISyntaxException, StorageException;
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

import com.microsoft.azure.storage.CloudStorageAccount;
import com.microsoft.azure.storage.LocationMode;
import com.microsoft.azure.storage.RetryExponentialRetry;
import com.microsoft.azure.storage.StorageException;
import com.microsoft.azure.storage.blob.BlobProperties;
import com.microsoft.azure.storage.blob.CloudBlobClient;
Expand All @@ -47,26 +48,26 @@

public class AzureStorageServiceImpl extends AbstractComponent implements AzureStorageService {

final AzureStorageSettings primaryStorageSettings;
final Map<String, AzureStorageSettings> primariesStorageSettings;
final Map<String, AzureStorageSettings> secondariesStorageSettings;

final Map<String, CloudBlobClient> clients;

public AzureStorageServiceImpl(Settings settings) {
super(settings);

Tuple<AzureStorageSettings, Map<String, AzureStorageSettings>> storageSettings = AzureStorageSettings.parse(settings);
this.primaryStorageSettings = storageSettings.v1();
Tuple<Map<String, AzureStorageSettings>, Map<String, AzureStorageSettings>> storageSettings = AzureStorageSettings.parse(settings);
this.primariesStorageSettings = storageSettings.v1();
this.secondariesStorageSettings = storageSettings.v2();

this.clients = new HashMap<>();

logger.debug("starting azure storage client instance");

// We register the primary client if any
if (primaryStorageSettings != null) {
logger.debug("registering primary client for account [{}]", primaryStorageSettings.getAccount());
createClient(primaryStorageSettings);
// We register all primary clients
for (Map.Entry<String, AzureStorageSettings> azureStorageSettingsEntry : primariesStorageSettings.entrySet()) {
logger.debug("registering primary client for account [{}]", azureStorageSettingsEntry.getKey());
createClient(azureStorageSettingsEntry.getValue());
}

// We register all secondary clients
Expand All @@ -81,6 +82,13 @@ void createClient(AzureStorageSettings azureStorageSettings) {
logger.trace("creating new Azure storage client using account [{}], key [{}]",
azureStorageSettings.getAccount(), azureStorageSettings.getKey());

if (this.clients.containsKey(azureStorageSettings.getAccount()))
{
logger.trace("Azure storage client using account [{}], key [{}] exists.",
azureStorageSettings.getAccount(), azureStorageSettings.getKey());
return;
}

String storageConnectionString =
"DefaultEndpointsProtocol=https;"
+ "AccountName="+ azureStorageSettings.getAccount() +";"
Expand All @@ -103,7 +111,7 @@ CloudBlobClient getSelectedClient(String account, LocationMode mode) {
logger.trace("selecting a client for account [{}], mode [{}]", account, mode.name());
AzureStorageSettings azureStorageSettings = null;

if (this.primaryStorageSettings == null) {
if (this.primariesStorageSettings == null) {
throw new IllegalArgumentException("No primary azure storage can be found. Check your elasticsearch.yml.");
}

Expand All @@ -113,8 +121,8 @@ CloudBlobClient getSelectedClient(String account, LocationMode mode) {

// if account is not secondary, it's the primary
if (azureStorageSettings == null) {
if (Strings.hasLength(account) == false || primaryStorageSettings.getName() == null || account.equals(primaryStorageSettings.getName())) {
azureStorageSettings = primaryStorageSettings;
if (Strings.hasLength(account)) {
azureStorageSettings = this.primariesStorageSettings.get(account);
}
}

Expand All @@ -138,6 +146,7 @@ CloudBlobClient getSelectedClient(String account, LocationMode mode) {
try {
int timeout = (int) azureStorageSettings.getTimeout().getMillis();
client.getDefaultRequestOptions().setTimeoutIntervalInMs(timeout);
client.getDefaultRequestOptions().setRetryPolicyFactory(new RetryExponentialRetry(1000 * 30, 7));
} catch (ClassCastException e) {
throw new IllegalArgumentException("Can not convert [" + azureStorageSettings.getTimeout() +
"]. It can not be longer than 2,147,483,647ms.");
Expand Down Expand Up @@ -294,14 +303,19 @@ public Map<String, BlobMetaData> listBlobsByPrefix(String account, LocationMode
}

@Override
public void moveBlob(String account, LocationMode mode, String container, String sourceBlob, String targetBlob) throws URISyntaxException, StorageException {
public void moveBlob(String account, String targetAccount, LocationMode mode, String container, String sourceBlob, String targetBlob) throws URISyntaxException, StorageException {
logger.debug("moveBlob container [{}], sourceBlob [{}], targetBlob [{}]", container, sourceBlob, targetBlob);

CloudBlobClient client = this.getSelectedClient(account, mode);
CloudBlobContainer blobContainer = client.getContainerReference(container);
CloudBlockBlob blobSource = blobContainer.getBlockBlobReference(sourceBlob);

CloudBlobClient targetClient = this.getSelectedClient(targetAccount, mode);
CloudBlobContainer targetBlobContainer = targetClient.getContainerReference(container);
targetBlobContainer.createIfNotExists();

if (blobSource.exists()) {
CloudBlockBlob blobTarget = blobContainer.getBlockBlobReference(targetBlob);
CloudBlockBlob blobTarget = targetBlobContainer.getBlockBlobReference(targetBlob);
blobTarget.startCopy(blobSource);
blobSource.delete();
logger.debug("moveBlob container [{}], sourceBlob [{}], targetBlob [{}] -> done", container, sourceBlob, targetBlob);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,9 +105,9 @@ public String toString() {
* @param settings settings to parse
* @return A tuple with v1 = primary storage and v2 = secondary storage
*/
public static Tuple<AzureStorageSettings, Map<String, AzureStorageSettings>> parse(Settings settings) {
public static Tuple<Map<String, AzureStorageSettings>, Map<String, AzureStorageSettings>> parse(Settings settings) {
List<AzureStorageSettings> storageSettings = createStorageSettings(settings);
return Tuple.tuple(getPrimary(storageSettings), getSecondaries(storageSettings));
return Tuple.tuple(getPrimaries(storageSettings), getSecondaries(storageSettings));
}

private static List<AzureStorageSettings> createStorageSettings(Settings settings) {
Expand All @@ -133,29 +133,26 @@ private static <T> T getValue(Settings settings, String groupName, Setting<T> se
return setting.getConcreteSetting(fullKey).get(settings);
}

private static AzureStorageSettings getPrimary(List<AzureStorageSettings> settings) {
private static Map<String, AzureStorageSettings> getPrimaries(List<AzureStorageSettings> settings) {
Map<String, AzureStorageSettings> primaries = new HashMap<>();

if (settings.isEmpty()) {
return null;
} else if (settings.size() == 1) {
// the only storage settings belong (implicitly) to the default primary storage
AzureStorageSettings storage = settings.get(0);
return new AzureStorageSettings(storage.getName(), storage.getAccount(), storage.getKey(), storage.getTimeout(), true);
primaries.put(storage.getName(), new AzureStorageSettings(storage.getName(), storage.getAccount(), storage.getKey(), storage.getTimeout(), true));
} else {
AzureStorageSettings primary = null;
for (AzureStorageSettings setting : settings) {
if (setting.isActiveByDefault()) {
if (primary == null) {
primary = setting;
} else {
throw new SettingsException("Multiple default Azure data stores configured: [" + primary.getName() + "] and [" + setting.getName() + "]");
}
primaries.put(setting.getName(), setting);
}
}
if (primary == null) {
if (primaries.size() == 0) {
throw new SettingsException("No default Azure data store configured");
}
return primary;
}
return Collections.unmodifiableMap(primaries);
}

private static Map<String, AzureStorageSettings> getSecondaries(List<AzureStorageSettings> settings) {
Expand Down