Skip to content

User proper write-once semantics for GCS repository #30438

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 5 commits into from
May 17, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -66,9 +66,6 @@ public InputStream readBlob(String blobName) throws IOException {

@Override
public void writeBlob(String blobName, InputStream inputStream, long blobSize) throws IOException {
if (blobExists(blobName)) {
throw new FileAlreadyExistsException("blob [" + blobName + "] already exists, cannot overwrite");
}
blobStore.writeBlob(buildKey(blobName), inputStream, blobSize);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import com.google.cloud.storage.Storage;
import com.google.cloud.storage.Storage.BlobListOption;
import com.google.cloud.storage.Storage.CopyRequest;
import com.google.cloud.storage.StorageException;
import org.elasticsearch.common.SuppressForbidden;
import org.elasticsearch.common.blobstore.BlobContainer;
import org.elasticsearch.common.blobstore.BlobMetaData;
Expand All @@ -47,12 +48,15 @@
import java.nio.channels.Channels;
import java.nio.channels.ReadableByteChannel;
import java.nio.channels.WritableByteChannel;
import java.nio.file.FileAlreadyExistsException;
import java.nio.file.NoSuchFileException;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;

import static java.net.HttpURLConnection.HTTP_PRECON_FAILED;

class GoogleCloudStorageBlobStore extends AbstractComponent implements BlobStore {

// The recommended maximum size of a blob that should be uploaded in a single
Expand Down Expand Up @@ -204,24 +208,32 @@ void writeBlob(String blobName, InputStream inputStream, long blobSize) throws I
* @param inputStream the stream containing the blob data
*/
private void writeBlobResumable(BlobInfo blobInfo, InputStream inputStream) throws IOException {
final WriteChannel writeChannel = SocketAccess.doPrivilegedIOException(() -> storage.writer(blobInfo));
Streams.copy(inputStream, Channels.newOutputStream(new WritableByteChannel() {
@Override
public boolean isOpen() {
return writeChannel.isOpen();
}
try {
final WriteChannel writeChannel = SocketAccess.doPrivilegedIOException(
() -> storage.writer(blobInfo, Storage.BlobWriteOption.doesNotExist()));
Streams.copy(inputStream, Channels.newOutputStream(new WritableByteChannel() {
@Override
public boolean isOpen() {
return writeChannel.isOpen();
}

@Override
public void close() throws IOException {
SocketAccess.doPrivilegedVoidIOException(writeChannel::close);
}
@Override
public void close() throws IOException {
SocketAccess.doPrivilegedVoidIOException(writeChannel::close);
}

@SuppressForbidden(reason = "Channel is based of a socket not a file")
@Override
public int write(ByteBuffer src) throws IOException {
return SocketAccess.doPrivilegedIOException(() -> writeChannel.write(src));
@SuppressForbidden(reason = "Channel is based of a socket not a file")
@Override
public int write(ByteBuffer src) throws IOException {
return SocketAccess.doPrivilegedIOException(() -> writeChannel.write(src));
}
}));
} catch (StorageException se) {
if (se.getCode() == HTTP_PRECON_FAILED) {
throw new FileAlreadyExistsException(blobInfo.getBlobId().getName(), null, se.getMessage());
}
}));
throw se;
}
}

/**
Expand All @@ -238,7 +250,17 @@ private void writeBlobMultipart(BlobInfo blobInfo, InputStream inputStream, long
assert blobSize <= LARGE_BLOB_THRESHOLD_BYTE_SIZE : "large blob uploads should use the resumable upload method";
final ByteArrayOutputStream baos = new ByteArrayOutputStream(Math.toIntExact(blobSize));
Streams.copy(inputStream, baos);
SocketAccess.doPrivilegedVoidIOException(() -> storage.create(blobInfo, baos.toByteArray()));
SocketAccess.doPrivilegedVoidIOException(
() -> {
try {
storage.create(blobInfo, baos.toByteArray(), Storage.BlobTargetOption.doesNotExist());
} catch (StorageException se) {
if (se.getCode() == HTTP_PRECON_FAILED) {
throw new FileAlreadyExistsException(blobInfo.getBlobId().getName(), null, se.getMessage());
}
throw se;
}
});
}

/**
Expand Down Expand Up @@ -295,8 +317,8 @@ void deleteBlobs(Collection<String> blobNames) throws IOException {
/**
* Moves a blob within the same bucket
*
* @param sourceBlob name of the blob to move
* @param targetBlob new name of the blob in the same bucket
* @param sourceBlobName name of the blob to move
* @param targetBlobName new name of the blob in the same bucket
*/
void moveBlob(String sourceBlobName, String targetBlobName) throws IOException {
final BlobId sourceBlobId = BlobId.of(bucket, sourceBlobName);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.stream.Stream;

/**
* {@link MockStorage} mocks a {@link Storage} client by storing all the blobs
Expand Down Expand Up @@ -113,7 +114,14 @@ public Blob create(BlobInfo blobInfo, byte[] content, BlobTargetOption... option
if (bucketName.equals(blobInfo.getBucket()) == false) {
throw new StorageException(404, "Bucket not found");
}
blobs.put(blobInfo.getName(), content);
if (Stream.of(options).anyMatch(option -> option.equals(BlobTargetOption.doesNotExist()))) {
byte[] existingBytes = blobs.putIfAbsent(blobInfo.getName(), content);
if (existingBytes != null) {
throw new StorageException(412, "Blob already exists");
}
} else {
blobs.put(blobInfo.getName(), content);
}
return get(BlobId.of(blobInfo.getBucket(), blobInfo.getName()));
}

Expand Down Expand Up @@ -243,9 +251,16 @@ public boolean isOpen() {
}

@Override
public void close() throws IOException {
public void close() {
IOUtils.closeWhileHandlingException(writableByteChannel);
blobs.put(blobInfo.getName(), output.toByteArray());
if (Stream.of(options).anyMatch(option -> option.equals(BlobWriteOption.doesNotExist()))) {
byte[] existingBytes = blobs.putIfAbsent(blobInfo.getName(), output.toByteArray());
if (existingBytes != null) {
throw new StorageException(412, "Blob already exists");
}
} else {
blobs.put(blobInfo.getName(), output.toByteArray());
}
}
};
}
Expand Down