Skip to content

Fix security manager bug writing large blobs to GCS #55421

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 28 additions & 3 deletions plugins/repository-gcs/qa/google-cloud-storage/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import org.elasticsearch.gradle.MavenFilteringHack
import org.elasticsearch.gradle.info.BuildParams
import org.elasticsearch.gradle.test.RestIntegTestTask

import java.nio.file.Files
import java.security.KeyPair
Expand Down Expand Up @@ -117,9 +118,21 @@ task gcsThirdPartyTests {
dependsOn check
}

integTest.mustRunAfter(thirdPartyTest)
check.dependsOn thirdPartyTest

integTest.mustRunAfter(thirdPartyTest)

/*
* We only use a small amount of data in these tests, which means that the resumable upload path is not tested. We add
* an additional test that forces the large blob threshold to be small to exercise the resumable upload path.
*/
task largeBlobIntegTest(type: RestIntegTestTask) {
mustRunAfter(thirdPartyTest)
}

check.dependsOn integTest
check.dependsOn largeBlobIntegTest

Map<String, Object> expansions = [
'bucket' : gcsBucket,
'base_path': gcsBasePath + "_integration_tests"
Expand All @@ -130,11 +143,15 @@ processTestResources {
MavenFilteringHack.filter(it, expansions)
}

integTest {
final Closure integTestConfiguration = {
dependsOn project(':plugins:repository-gcs').bundlePlugin
}

testClusters.integTest {
integTest integTestConfiguration

largeBlobIntegTest integTestConfiguration

final Closure testClustersConfiguration = {
plugin project(':plugins:repository-gcs').bundlePlugin.archiveFile

keystore 'gcs.client.integration_test.credentials_file', serviceAccountFile, IGNORE_VALUE
Expand All @@ -148,3 +165,11 @@ testClusters.integTest {
println "Using an external service to test the repository-gcs plugin"
}
}

testClusters.integTest testClustersConfiguration

testClusters.largeBlobIntegTest testClustersConfiguration
testClusters.largeBlobIntegTest {
// force large blob uploads by setting the threshold small, forcing this code path to be tested
systemProperty 'es.repository_gcs.large_blob_threshold_byte_size', '256'
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

import com.google.api.gax.paging.Page;
import com.google.cloud.BatchResult;
import com.google.cloud.WriteChannel;
import com.google.cloud.storage.Blob;
import com.google.cloud.storage.BlobId;
import com.google.cloud.storage.BlobInfo;
Expand All @@ -41,11 +42,15 @@
import org.elasticsearch.common.blobstore.DeleteResult;
import org.elasticsearch.common.blobstore.support.PlainBlobMetadata;
import org.elasticsearch.common.collect.MapBuilder;
import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.core.internal.io.Streams;

import java.io.IOException;
import java.io.InputStream;
import java.nio.ByteBuffer;
import java.nio.channels.Channels;
import java.nio.channels.WritableByteChannel;
import java.nio.file.FileAlreadyExistsException;
import java.util.ArrayList;
import java.util.Collection;
Expand All @@ -68,7 +73,29 @@ class GoogleCloudStorageBlobStore implements BlobStore {
// request. Larger files should be uploaded over multiple requests (this is
// called "resumable upload")
// https://cloud.google.com/storage/docs/json_api/v1/how-tos/resumable-upload
public static final int LARGE_BLOB_THRESHOLD_BYTE_SIZE = 5 * 1024 * 1024;
public static final int LARGE_BLOB_THRESHOLD_BYTE_SIZE;

static {
final String key = "es.repository_gcs.large_blob_threshold_byte_size";
final String largeBlobThresholdByteSizeProperty = System.getProperty(key);
if (largeBlobThresholdByteSizeProperty == null) {
LARGE_BLOB_THRESHOLD_BYTE_SIZE = Math.toIntExact(new ByteSizeValue(5, ByteSizeUnit.MB).getBytes());
} else {
if (true) {
throw new IllegalArgumentException();
}
final int largeBlobThresholdByteSize;
try {
largeBlobThresholdByteSize = Integer.parseInt(largeBlobThresholdByteSizeProperty);
} catch (final NumberFormatException e) {
throw new IllegalArgumentException("failed to parse " + key + " having value [" + largeBlobThresholdByteSizeProperty + "]");
}
if (largeBlobThresholdByteSize <= 0) {
throw new IllegalArgumentException(key + " must be positive but was [" + largeBlobThresholdByteSizeProperty + "]");
}
LARGE_BLOB_THRESHOLD_BYTE_SIZE = largeBlobThresholdByteSize;
}
}

private final String bucketName;
private final String clientName;
Expand Down Expand Up @@ -212,8 +239,29 @@ private void writeBlobResumable(BlobInfo blobInfo, InputStream inputStream, bool
new Storage.BlobWriteOption[]{Storage.BlobWriteOption.doesNotExist()} : new Storage.BlobWriteOption[0];
for (int retry = 0; retry < 3; ++retry) {
try {
SocketAccess.doPrivilegedVoidIOException(() ->
Streams.copy(inputStream, Channels.newOutputStream(client().writer(blobInfo, writeOptions))));
final WriteChannel writeChannel = SocketAccess.doPrivilegedIOException(() -> client().writer(blobInfo, writeOptions));
/*
* It is not enough to wrap the call to Streams#copy, we have to wrap the privileged calls too; this is because Streams#copy
* is in the stacktrace and is not granted the permissions needed to close and write the channel.
*/
Streams.copy(inputStream, Channels.newOutputStream(new WritableByteChannel() {

@Override
public int write(final ByteBuffer src) throws IOException {
return SocketAccess.doPrivilegedIOException(() -> writeChannel.write(src));
}

@Override
public boolean isOpen() {
return writeChannel.isOpen();
}

@Override
public void close() throws IOException {
SocketAccess.doPrivilegedVoidIOException(writeChannel::close);
}

}));
return;
} catch (final StorageException se) {
final int errorCode = se.getCode();
Expand Down