Skip to content

Commit b7ac5e3

Browse files
committed
Change inefficient implementation of GoogleCloudStorageOutputStream to use an internal BufferedOutputstream with a configurable buffer_size
1 parent 69a40f8 commit b7ac5e3

File tree

8 files changed

+72
-29
lines changed

8 files changed

+72
-29
lines changed

src/main/java/org/elasticsearch/cloud/gce/GoogleCloudStorageService.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@ static public final class Fields {
3737
public static final String BUCKET = "bucket";
3838
public static final String BUCKET_LOCATION = "location";
3939
public static final String CHUNK_SIZE = "chunk_size";
40+
public static final String BUFFER_SIZE = "buffer_size";
4041
public static final String COMPRESS = "compress";
4142
public static final String BASE_PATH = "base_path";
4243
public static final String CREDENTIALS_FILE = "credentials_file";
@@ -104,9 +105,10 @@ static public final class Fields {
104105
* @param executor
105106
* @param bucketName
106107
* @param blobName
108+
* @param bufferSizeInBytes
107109
* @return
108110
*/
109-
OutputStream getOutputStream(Executor executor, String bucketName, String blobName) throws IOException;
111+
OutputStream getOutputStream(Executor executor, String bucketName, String blobName, int bufferSizeInBytes) throws IOException;
110112

111113
/**
112114
* List all blobs in a given bucket which have a prefix

src/main/java/org/elasticsearch/cloud/gce/GoogleCloudStorageServiceImpl.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -176,11 +176,11 @@ public InputStream getInputStream(String bucketName, String blobName) throws IOE
176176
}
177177

178178
@Override
179-
public OutputStream getOutputStream(Executor executor, String bucketName, String blobName) throws IOException {
179+
public OutputStream getOutputStream(Executor executor, String bucketName, String blobName, int bufferSizeInBytes) throws IOException {
180180
// The concurrent upload does buffering internally
181181
ConcurrentUpload upload = prepareConcurrentUpload(bucketName, blobName);
182182
// ConcurrentUpload is executed in a dedicated thread
183-
return new GoogleCloudStorageOutputStream(executor, upload);
183+
return new GoogleCloudStorageOutputStream(executor, upload, bufferSizeInBytes);
184184
}
185185

186186
protected <T> T prepareConcurrentUpload(String bucketName, String blobName) {

src/main/java/org/elasticsearch/cloud/gce/blobstore/GoogleCloudStorageBlobContainer.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -86,7 +86,7 @@ public InputStream openInput(String blobName) throws IOException {
8686

8787
@Override
8888
public OutputStream createOutput(String blobName) throws IOException {
89-
return blobStore.client().getOutputStream(blobStore.executor(), blobStore.bucket(), buildKey(blobName));
89+
return blobStore.client().getOutputStream(blobStore.executor(), blobStore.bucket(), buildKey(blobName), blobStore.bufferSizeInBytes());
9090
}
9191

9292
@Override

src/main/java/org/elasticsearch/cloud/gce/blobstore/GoogleCloudStorageBlobStore.java

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
import org.elasticsearch.common.blobstore.BlobStore;
2828
import org.elasticsearch.common.component.AbstractComponent;
2929
import org.elasticsearch.common.settings.Settings;
30+
import org.elasticsearch.common.unit.ByteSizeValue;
3031

3132
import java.io.IOException;
3233
import java.nio.file.FileAlreadyExistsException;
@@ -45,11 +46,15 @@ public class GoogleCloudStorageBlobStore extends AbstractComponent implements Bl
4546

4647
private final String bucket;
4748

48-
public GoogleCloudStorageBlobStore(Settings settings, Executor executor, GoogleCloudStorageService client, String projectName, String bucketName, String bucketLocation) throws IOException {
49+
private final ByteSizeValue bufferSize;
50+
51+
public GoogleCloudStorageBlobStore(Settings settings, Executor executor, GoogleCloudStorageService client, String projectName,
52+
String bucketName, String bucketLocation, ByteSizeValue bufferSize) throws IOException {
4953
super(settings);
5054
this.executor = executor;
5155
this.client = client;
5256
this.bucket = bucketName;
57+
this.bufferSize = bufferSize;
5358

5459
try {
5560
if (!client.doesBucketExist(bucketName)) {
@@ -83,6 +88,10 @@ public String bucket() {
8388
return bucket;
8489
}
8590

91+
public int bufferSizeInBytes() {
92+
return bufferSize.bytesAsInt();
93+
}
94+
8695
@Override
8796
public BlobContainer blobContainer(BlobPath path) {
8897
return new GoogleCloudStorageBlobContainer(path, this);

src/main/java/org/elasticsearch/cloud/gce/blobstore/GoogleCloudStorageOutputStream.java

Lines changed: 43 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -21,26 +21,33 @@
2121

2222
import org.elasticsearch.common.Preconditions;
2323

24-
import java.io.IOException;
25-
import java.io.OutputStream;
26-
import java.io.PipedInputStream;
27-
import java.io.PipedOutputStream;
24+
import java.io.*;
2825
import java.util.concurrent.Executor;
2926

3027

3128
public class GoogleCloudStorageOutputStream extends OutputStream {
3229

30+
/**
31+
* Wraps the PipedOutputStream in a BufferedOutputStream
32+
*/
33+
private BufferedOutputStream output;
34+
3335
/**
3436
* The PipedOutputStream is used by the caller to write data that are directly
3537
* piped to the PipedInputStream.
3638
*/
37-
private PipedOutputStream output = new PipedOutputStream();
39+
private PipedOutputStream pipedout;
3840

3941
/**
4042
* The PipedInputStream is used by a ConcurrentUpload object to read the data to send
4143
* to Google Cloud Storage.
4244
*/
43-
private PipedInputStream input;
45+
private PipedInputStream pipedin;
46+
47+
/**
48+
* Buffer size
49+
*/
50+
private int bufferSize;
4451

4552
/**
4653
* A ConcurrentUpload represents an upload request that is executed in the background.
@@ -53,32 +60,49 @@ public class GoogleCloudStorageOutputStream extends OutputStream {
5360
*/
5461
private Executor executor;
5562

56-
public GoogleCloudStorageOutputStream(Executor executor, ConcurrentUpload upload) {
63+
public GoogleCloudStorageOutputStream(Executor executor, ConcurrentUpload upload, int bufferSizeInBytes) {
5764
Preconditions.checkNotNull(executor, "An executor must be provided");
5865
this.executor = executor;
5966
Preconditions.checkNotNull(upload, "An upload request must be provided");
6067
this.upload = upload;
68+
this.bufferSize = bufferSizeInBytes;
69+
70+
pipedout = new PipedOutputStream();
71+
output = new BufferedOutputStream(pipedout, bufferSize);
6172
}
6273

63-
@Override
64-
public void write(int b) throws IOException {
65-
if (input == null) {
66-
// Connects output -> input
67-
input = new PipedInputStream(output);
74+
private void initialize() throws IOException {
75+
if (pipedin == null) {
76+
// Connects pipedout -> pipedin
77+
pipedin = new PipedInputStream(pipedout, bufferSize);
6878

69-
// Connects input -> concurrent upload
70-
upload.initializeUpload(input);
79+
// Connects pipedin -> concurrent upload
80+
upload.initializeUpload(pipedin);
7181

7282
// Starts the concurrent upload
7383
executor.execute(upload);
7484
}
85+
}
7586

76-
// Rethrow exception if something wrong happen
77-
checkForConcurrentUploadErrors();
87+
@Override
88+
public void write(int b) throws IOException {
89+
initialize();
7890

7991
output.write(b);
8092
}
8193

94+
@Override
95+
public void write(byte[] b, int off, int len) throws IOException {
96+
initialize();
97+
98+
output.write(b, off, len);
99+
}
100+
101+
@Override
102+
public void flush() throws IOException {
103+
output.flush();
104+
}
105+
82106
@Override
83107
public void close() throws IOException {
84108
if (output != null) {
@@ -89,7 +113,7 @@ public void close() throws IOException {
89113
}
90114
}
91115

92-
if (input != null) {
116+
if (pipedin != null) {
93117
try {
94118
// Waits for the upload request to complete
95119
upload.waitForCompletion();
@@ -99,7 +123,8 @@ public void close() throws IOException {
99123

100124
} finally {
101125
output = null;
102-
input = null;
126+
pipedout = null;
127+
pipedin = null;
103128
upload = null;
104129
}
105130
}

src/main/java/org/elasticsearch/repositories/gce/GoogleCloudStorageRepository.java

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,8 @@ public class GoogleCloudStorageRepository extends BlobStoreRepository {
4949

5050
public final static String TYPE = "gcs";
5151

52+
public static final ByteSizeValue DEFAULT_BUFFER_SIZE = new ByteSizeValue(1, ByteSizeUnit.MB);
53+
5254
private final GoogleCloudStorageBlobStore blobStore;
5355

5456
private final BlobPath basePath;
@@ -105,11 +107,13 @@ public GoogleCloudStorageRepository(RepositoryName name, RepositorySettings repo
105107
}
106108

107109
int concurrentStreams = repositorySettings.settings().getAsInt("concurrent_streams", componentSettings.getAsInt("concurrent_streams", 5));
108-
ExecutorService concurrentStreamPool = EsExecutors.newScaling(1, concurrentStreams, 5, TimeUnit.SECONDS, EsExecutors.daemonThreadFactory(settings, "[s3_stream]"));
110+
ExecutorService concurrentStreamPool = EsExecutors.newScaling(1, concurrentStreams, 5, TimeUnit.SECONDS, EsExecutors.daemonThreadFactory(settings, "[gcs_stream]"));
111+
112+
ByteSizeValue bufferSize = repositorySettings.settings().getAsBytesSize(BUFFER_SIZE, componentSettings.getAsBytesSize(BUFFER_SIZE, DEFAULT_BUFFER_SIZE));
109113

110114
logger.debug("using projet id [{}], bucket [{}], location [{}], base_path [{}], chunk_size [{}], compress [{}]",
111115
projectId, bucketName, bucketLocation, basePath, chunkSize, compress);
112-
this.blobStore = new GoogleCloudStorageBlobStore(settings, concurrentStreamPool, googleCloudStorageService, projectId, bucketName, bucketLocation);
116+
this.blobStore = new GoogleCloudStorageBlobStore(settings, concurrentStreamPool, googleCloudStorageService, projectId, bucketName, bucketLocation, bufferSize);
113117
}
114118

115119

src/test/java/org/elasticsearch/cloud/gce/blobstore/GoogleCloudStorageOutputStreamTest.java

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import org.elasticsearch.cloud.gce.GoogleCloudStorageService;
2323
import org.elasticsearch.common.settings.ImmutableSettings;
2424
import org.elasticsearch.common.util.concurrent.EsExecutors;
25+
import org.elasticsearch.repositories.gce.GoogleCloudStorageRepository;
2526
import org.elasticsearch.repositories.gce.MockGoogleCloudStorageService;
2627
import org.elasticsearch.test.ElasticsearchTestCase;
2728
import org.junit.After;
@@ -46,6 +47,8 @@
4647
*/
4748
public class GoogleCloudStorageOutputStreamTest extends ElasticsearchTestCase {
4849

50+
private int bufferSize = GoogleCloudStorageRepository.DEFAULT_BUFFER_SIZE.bytesAsInt();
51+
4952
private ExecutorService executor;
5053

5154
@Before
@@ -67,7 +70,7 @@ public void tearDownExecutor() throws InterruptedException {
6770
@Test
6871
public void testWriteRandomDataToMockConcurrentUpload() throws IOException {
6972
ConcurrentUpload<byte[]> upload = new MockConcurrentUpload();
70-
GoogleCloudStorageOutputStream out = new GoogleCloudStorageOutputStream(executor, upload);
73+
GoogleCloudStorageOutputStream out = new GoogleCloudStorageOutputStream(executor, upload, bufferSize);
7174

7275
Integer randomLength = randomIntBetween(1, 10000000);
7376
ByteArrayOutputStream content = new ByteArrayOutputStream(randomLength);
@@ -90,7 +93,7 @@ public void testWriteRandomDataToGoogleCloudStorageConcurrentUpload() throws IOE
9093
GoogleCloudStorageService service = new MockGoogleCloudStorageService(ImmutableSettings.EMPTY, result);
9194

9295
GoogleCloudStorageConcurrentUpload upload = new GoogleCloudStorageConcurrentUpload(service, "test-bucket", "test-project");
93-
GoogleCloudStorageOutputStream out = new GoogleCloudStorageOutputStream(executor, upload);
96+
GoogleCloudStorageOutputStream out = new GoogleCloudStorageOutputStream(executor, upload, bufferSize);
9497

9598
Integer randomLength = randomIntBetween(1, 10000000);
9699
ByteArrayOutputStream content = new ByteArrayOutputStream(randomLength);
@@ -139,7 +142,7 @@ public void run() {
139142
try {
140143
GoogleCloudStorageService service = new MockGoogleCloudStorageService(ImmutableSettings.EMPTY, result);
141144
GoogleCloudStorageConcurrentUpload upload = new GoogleCloudStorageConcurrentUpload(service, "test-bucket", "test-blob-" + num);
142-
GoogleCloudStorageOutputStream out = new GoogleCloudStorageOutputStream(executor, upload);
145+
GoogleCloudStorageOutputStream out = new GoogleCloudStorageOutputStream(executor, upload, bufferSize);
143146

144147
for (int i = 0; i < randomLength; i++) {
145148
content.write(randomByte());

src/test/java/org/elasticsearch/repositories/gce/MockGoogleCloudStorageService.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -149,7 +149,7 @@ public InputStream getInputStream(String bucketName, String blobName) throws IOE
149149
}
150150

151151
@Override
152-
public OutputStream getOutputStream(Executor executor, String bucketName, String blobName) throws IOException {
152+
public OutputStream getOutputStream(Executor executor, String bucketName, String blobName, int bufferSize) throws IOException {
153153
ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
154154
blobs.put(key(bucketName, blobName), outputStream);
155155
return outputStream;

0 commit comments

Comments
 (0)