diff --git a/plugins/repository-gcs/qa/google-cloud-storage/build.gradle b/plugins/repository-gcs/qa/google-cloud-storage/build.gradle index d70c1921908f5..aef065cc8d92b 100644 --- a/plugins/repository-gcs/qa/google-cloud-storage/build.gradle +++ b/plugins/repository-gcs/qa/google-cloud-storage/build.gradle @@ -20,6 +20,7 @@ import org.elasticsearch.gradle.MavenFilteringHack import org.elasticsearch.gradle.info.BuildParams +import org.elasticsearch.gradle.test.RestIntegTestTask import java.nio.file.Files import java.security.KeyPair @@ -117,9 +118,21 @@ task gcsThirdPartyTests { dependsOn check } -integTest.mustRunAfter(thirdPartyTest) check.dependsOn thirdPartyTest +integTest.mustRunAfter(thirdPartyTest) + +/* + * We only use a small amount of data in these tests, which means that the resumable upload path is not tested. We add + * an additional test that forces the large blob threshold to be small to exercise the resumable upload path. + */ +task largeBlobIntegTest(type: RestIntegTestTask) { + mustRunAfter(thirdPartyTest) +} + +check.dependsOn integTest +check.dependsOn largeBlobIntegTest + Map expansions = [ 'bucket' : gcsBucket, 'base_path': gcsBasePath + "_integration_tests" @@ -130,11 +143,15 @@ processTestResources { MavenFilteringHack.filter(it, expansions) } -integTest { +final Closure integTestConfiguration = { dependsOn project(':plugins:repository-gcs').bundlePlugin } -testClusters.integTest { +integTest integTestConfiguration + +largeBlobIntegTest integTestConfiguration + +final Closure testClustersConfiguration = { plugin project(':plugins:repository-gcs').bundlePlugin.archiveFile keystore 'gcs.client.integration_test.credentials_file', serviceAccountFile, IGNORE_VALUE @@ -148,3 +165,11 @@ testClusters.integTest { println "Using an external service to test the repository-gcs plugin" } } + +testClusters.integTest testClustersConfiguration + +testClusters.largeBlobIntegTest testClustersConfiguration +testClusters.largeBlobIntegTest { + // force large blob uploads by setting the threshold small, forcing this code path to be tested + systemProperty 'es.repository_gcs.large_blob_threshold_byte_size', '256' +} diff --git a/plugins/repository-gcs/src/main/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageBlobStore.java b/plugins/repository-gcs/src/main/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageBlobStore.java index 134c9fc3519fc..47b942de10da2 100644 --- a/plugins/repository-gcs/src/main/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageBlobStore.java +++ b/plugins/repository-gcs/src/main/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageBlobStore.java @@ -21,6 +21,7 @@ import com.google.api.gax.paging.Page; import com.google.cloud.BatchResult; +import com.google.cloud.WriteChannel; import com.google.cloud.storage.Blob; import com.google.cloud.storage.BlobId; import com.google.cloud.storage.BlobInfo; @@ -33,6 +34,7 @@ import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.message.ParameterizedMessage; import org.elasticsearch.ExceptionsHelper; +import org.elasticsearch.common.SuppressForbidden; import org.elasticsearch.common.blobstore.BlobContainer; import org.elasticsearch.common.blobstore.BlobMetadata; import org.elasticsearch.common.blobstore.BlobPath; @@ -41,11 +43,15 @@ import org.elasticsearch.common.blobstore.DeleteResult; import org.elasticsearch.common.blobstore.support.PlainBlobMetadata; import org.elasticsearch.common.collect.MapBuilder; +import org.elasticsearch.common.unit.ByteSizeUnit; +import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.core.internal.io.Streams; import java.io.IOException; import java.io.InputStream; +import java.nio.ByteBuffer; import java.nio.channels.Channels; +import java.nio.channels.WritableByteChannel; import java.nio.file.FileAlreadyExistsException; import java.util.ArrayList; import java.util.Collection; @@ -68,7 +74,26 @@ class GoogleCloudStorageBlobStore implements BlobStore { // request. Larger files should be uploaded over multiple requests (this is // called "resumable upload") // https://cloud.google.com/storage/docs/json_api/v1/how-tos/resumable-upload - public static final int LARGE_BLOB_THRESHOLD_BYTE_SIZE = 5 * 1024 * 1024; + public static final int LARGE_BLOB_THRESHOLD_BYTE_SIZE; + + static { + final String key = "es.repository_gcs.large_blob_threshold_byte_size"; + final String largeBlobThresholdByteSizeProperty = System.getProperty(key); + if (largeBlobThresholdByteSizeProperty == null) { + LARGE_BLOB_THRESHOLD_BYTE_SIZE = Math.toIntExact(new ByteSizeValue(5, ByteSizeUnit.MB).getBytes()); + } else { + final int largeBlobThresholdByteSize; + try { + largeBlobThresholdByteSize = Integer.parseInt(largeBlobThresholdByteSizeProperty); + } catch (final NumberFormatException e) { + throw new IllegalArgumentException("failed to parse " + key + " having value [" + largeBlobThresholdByteSizeProperty + "]"); + } + if (largeBlobThresholdByteSize <= 0) { + throw new IllegalArgumentException(key + " must be positive but was [" + largeBlobThresholdByteSizeProperty + "]"); + } + LARGE_BLOB_THRESHOLD_BYTE_SIZE = largeBlobThresholdByteSize; + } + } private final String bucketName; private final String clientName; @@ -212,8 +237,30 @@ private void writeBlobResumable(BlobInfo blobInfo, InputStream inputStream, bool new Storage.BlobWriteOption[]{Storage.BlobWriteOption.doesNotExist()} : new Storage.BlobWriteOption[0]; for (int retry = 0; retry < 3; ++retry) { try { - SocketAccess.doPrivilegedVoidIOException(() -> - Streams.copy(inputStream, Channels.newOutputStream(client().writer(blobInfo, writeOptions)))); + final WriteChannel writeChannel = SocketAccess.doPrivilegedIOException(() -> client().writer(blobInfo, writeOptions)); + /* + * It is not enough to wrap the call to Streams#copy, we have to wrap the privileged calls too; this is because Streams#copy + * is in the stacktrace and is not granted the permissions needed to close and write the channel. + */ + Streams.copy(inputStream, Channels.newOutputStream(new WritableByteChannel() { + + @SuppressForbidden(reason = "channel is based on a socket") + @Override + public int write(final ByteBuffer src) throws IOException { + return SocketAccess.doPrivilegedIOException(() -> writeChannel.write(src)); + } + + @Override + public boolean isOpen() { + return writeChannel.isOpen(); + } + + @Override + public void close() throws IOException { + SocketAccess.doPrivilegedVoidIOException(writeChannel::close); + } + + })); return; } catch (final StorageException se) { final int errorCode = se.getCode();