Skip to content

Commit 4bc4ea6

Browse files
committed
BlobStore: BlobContainer interface changed in elasticsearch 1.4.0
Adding a S3OutputStream that upload blobs to the S3 Storage service with two modes (single/multipart). When the length of the chunk is lower than buffer_size (default to 5mb), the chunk is uploaded with a single request. Otherwise multiple requests are made, each of buffer_size (except the last one which can be lower than buffer_size). For example, when uploading a blob (say, 1Gb) with chunk_size set for accepting large chunks (chunk_size = 5Gb) and buffer_size set to 100Mb, the blob will be sent into 10 multiple parts, each of ~100Mb. Each part upload may failed independently and will be retried 3 times. Closes #117
1 parent 039a793 commit 4bc4ea6

File tree

13 files changed

+649
-157
lines changed

13 files changed

+649
-157
lines changed

README.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -158,6 +158,7 @@ The following settings are supported:
158158
* `chunk_size`: Big files can be broken down into chunks during snapshotting if needed. The chunk size can be specified in bytes or by using size value notation, i.e. `1g`, `10m`, `5k`. Defaults to `100m`.
159159
* `compress`: When set to `true` metadata files are stored in compressed format. This setting doesn't affect index files that are already compressed by default. Defaults to `false`.
160160
* `server_side_encryption`: When set to `true` files are encrypted on server side using AES256 algorithm. Defaults to `false`.
161+
* `buffer_size`: Minimum threshold below which the chunk is uploaded using a single request. Beyond this threshold, the S3 repository will use the [AWS Multipart Upload API](http://docs.aws.amazon.com/AmazonS3/latest/dev/uploadobjusingmpu.html) to split the chunk into several parts, each of `buffer_size` length, and to upload each part in its own request. Note that positionning a buffer size lower than `5mb` is not allowed since it will prevents the use of the Multipart API and may result in upload errors. Defaults to `5mb`.
161162
* `max_retries`: Number of retries in case of S3 errors. Defaults to `3`.
162163

163164
The S3 repositories are using the same credentials as the rest of the AWS services provided by this plugin (`discovery`).

pom.xml

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@
3434
<properties>
3535
<elasticsearch.version>2.0.0-SNAPSHOT</elasticsearch.version>
3636
<lucene.version>4.10.1</lucene.version>
37+
<amazonaws.version>1.7.13</amazonaws.version>
3738
<tests.output>onerror</tests.output>
3839
<tests.shuffle>true</tests.shuffle>
3940
<tests.output>onerror</tests.output>
@@ -85,7 +86,7 @@
8586
<dependency>
8687
<groupId>com.amazonaws</groupId>
8788
<artifactId>aws-java-sdk</artifactId>
88-
<version>1.7.13</version>
89+
<version>${amazonaws.version}</version>
8990
<scope>compile</scope>
9091
<exclusions>
9192
<!-- jackson is optional -->

src/main/java/org/elasticsearch/cloud/aws/InternalAwsS3Service.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import com.amazonaws.ClientConfiguration;
2323
import com.amazonaws.Protocol;
2424
import com.amazonaws.auth.*;
25+
import com.amazonaws.http.IdleConnectionReaper;
2526
import com.amazonaws.internal.StaticCredentialsProvider;
2627
import com.amazonaws.services.s3.AmazonS3;
2728
import com.amazonaws.services.s3.AmazonS3Client;
@@ -192,5 +193,8 @@ protected void doClose() throws ElasticsearchException {
192193
for (AmazonS3Client client : clients.values()) {
193194
client.shutdown();
194195
}
196+
197+
// Ensure that IdleConnectionReaper is shutdown
198+
IdleConnectionReaper.shutdown();
195199
}
196200
}
Lines changed: 225 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,225 @@
1+
/*
2+
* Licensed to Elasticsearch under one or more contributor
3+
* license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright
5+
* ownership. Elasticsearch licenses this file to you under
6+
* the Apache License, Version 2.0 (the "License"); you may
7+
* not use this file except in compliance with the License.
8+
* You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.elasticsearch.cloud.aws.blobstore;
21+
22+
import com.amazonaws.services.s3.model.*;
23+
import org.elasticsearch.common.unit.ByteSizeUnit;
24+
import org.elasticsearch.common.unit.ByteSizeValue;
25+
26+
import java.io.ByteArrayInputStream;
27+
import java.io.IOException;
28+
import java.io.InputStream;
29+
import java.util.ArrayList;
30+
import java.util.List;
31+
32+
/**
33+
* DefaultS3OutputStream uploads data to the AWS S3 service using 2 modes: single and multi part.
34+
* <p/>
35+
* When the length of the chunk is lower than buffer_size, the chunk is uploaded with a single request.
36+
* Otherwise multiple requests are made, each of buffer_size (except the last one which can be lower than buffer_size).
37+
* <p/>
38+
* Quick facts about S3:
39+
* <p/>
40+
* Maximum object size: 5 TB
41+
* Maximum number of parts per upload: 10,000
42+
* Part numbers: 1 to 10,000 (inclusive)
43+
* Part size: 5 MB to 5 GB, last part can be < 5 MB
44+
* <p/>
45+
* See http://docs.aws.amazon.com/AmazonS3/latest/dev/qfacts.html
46+
* See http://docs.aws.amazon.com/AmazonS3/latest/dev/uploadobjusingmpu.html
47+
*/
48+
public class DefaultS3OutputStream extends S3OutputStream {
49+
50+
private static final ByteSizeValue MULTIPART_MAX_SIZE = new ByteSizeValue(5, ByteSizeUnit.GB);
51+
52+
/**
53+
* Multipart Upload API data
54+
*/
55+
private String multipartId;
56+
private int multipartChunks;
57+
private List<PartETag> multiparts;
58+
59+
public DefaultS3OutputStream(S3BlobStore blobStore, String bucketName, String blobName, int bufferSizeInBytes, int numberOfRetries, boolean serverSideEncryption) {
60+
super(blobStore, bucketName, blobName, bufferSizeInBytes, numberOfRetries, serverSideEncryption);
61+
}
62+
63+
@Override
64+
public void flush(byte[] bytes, int off, int len, boolean closing) throws IOException {
65+
if (len > MULTIPART_MAX_SIZE.getBytes()) {
66+
throw new IOException("Unable to upload files larger than " + MULTIPART_MAX_SIZE + " to Amazon S3");
67+
}
68+
69+
if (!closing) {
70+
if (len < getBufferSize()) {
71+
upload(bytes, off, len);
72+
} else {
73+
if (getFlushCount() == 0) {
74+
initializeMultipart();
75+
}
76+
uploadMultipart(bytes, off, len, false);
77+
}
78+
} else {
79+
if (multipartId != null) {
80+
uploadMultipart(bytes, off, len, true);
81+
completeMultipart();
82+
} else {
83+
upload(bytes, off, len);
84+
}
85+
}
86+
}
87+
88+
/**
89+
* Upload data using a single request.
90+
*
91+
* @param bytes
92+
* @param off
93+
* @param len
94+
* @throws IOException
95+
*/
96+
private void upload(byte[] bytes, int off, int len) throws IOException {
97+
try (ByteArrayInputStream is = new ByteArrayInputStream(bytes, off, len)) {
98+
int retry = 0;
99+
while (retry < getNumberOfRetries()) {
100+
try {
101+
doUpload(getBlobStore(), getBucketName(), getBlobName(), is, len, isServerSideEncryption());
102+
break;
103+
} catch (AmazonS3Exception e) {
104+
if (shouldRetry(e)) {
105+
is.reset();
106+
retry++;
107+
} else {
108+
throw new IOException("Unable to upload object " + getBlobName(), e);
109+
}
110+
}
111+
}
112+
}
113+
}
114+
115+
protected void doUpload(S3BlobStore blobStore, String bucketName, String blobName, InputStream is, int length,
116+
boolean serverSideEncryption) throws AmazonS3Exception {
117+
ObjectMetadata md = new ObjectMetadata();
118+
if (serverSideEncryption) {
119+
md.setSSEAlgorithm(ObjectMetadata.AES_256_SERVER_SIDE_ENCRYPTION);
120+
}
121+
md.setContentLength(length);
122+
blobStore.client().putObject(bucketName, blobName, is, md);
123+
}
124+
125+
private void initializeMultipart() {
126+
if (multipartId == null) {
127+
multipartId = doInitialize(getBlobStore(), getBucketName(), getBlobName(), isServerSideEncryption());
128+
if (multipartId != null) {
129+
multipartChunks = 1;
130+
multiparts = new ArrayList<>();
131+
}
132+
}
133+
}
134+
135+
protected String doInitialize(S3BlobStore blobStore, String bucketName, String blobName, boolean serverSideEncryption) {
136+
InitiateMultipartUploadRequest request = new InitiateMultipartUploadRequest(bucketName, blobName);
137+
if (serverSideEncryption) {
138+
ObjectMetadata md = new ObjectMetadata();
139+
md.setSSEAlgorithm(ObjectMetadata.AES_256_SERVER_SIDE_ENCRYPTION);
140+
request.setObjectMetadata(md);
141+
}
142+
return blobStore.client().initiateMultipartUpload(request).getUploadId();
143+
}
144+
145+
private void uploadMultipart(byte[] bytes, int off, int len, boolean lastPart) throws IOException {
146+
try (ByteArrayInputStream is = new ByteArrayInputStream(bytes, off, len)) {
147+
int retry = 0;
148+
while (retry < getNumberOfRetries()) {
149+
try {
150+
PartETag partETag = doUploadMultipart(getBlobStore(), getBucketName(), getBlobName(), multipartId, is, len, lastPart);
151+
multiparts.add(partETag);
152+
multipartChunks++;
153+
return;
154+
} catch (AmazonS3Exception e) {
155+
if (shouldRetry(e) && retry < getNumberOfRetries()) {
156+
is.reset();
157+
retry++;
158+
} else {
159+
abortMultipart();
160+
throw e;
161+
}
162+
}
163+
}
164+
}
165+
}
166+
167+
protected PartETag doUploadMultipart(S3BlobStore blobStore, String bucketName, String blobName, String uploadId, InputStream is,
168+
int length, boolean lastPart) throws AmazonS3Exception {
169+
UploadPartRequest request = new UploadPartRequest()
170+
.withBucketName(bucketName)
171+
.withKey(blobName)
172+
.withUploadId(uploadId)
173+
.withPartNumber(multipartChunks)
174+
.withInputStream(is)
175+
.withPartSize(length)
176+
.withLastPart(lastPart);
177+
178+
UploadPartResult response = blobStore.client().uploadPart(request);
179+
return response.getPartETag();
180+
181+
}
182+
183+
private void completeMultipart() {
184+
int retry = 0;
185+
while (retry < getNumberOfRetries()) {
186+
try {
187+
doCompleteMultipart(getBlobStore(), getBucketName(), getBlobName(), multipartId, multiparts);
188+
multipartId = null;
189+
return;
190+
} catch (AmazonS3Exception e) {
191+
if (shouldRetry(e) && retry < getNumberOfRetries()) {
192+
retry++;
193+
} else {
194+
abortMultipart();
195+
throw e;
196+
}
197+
}
198+
}
199+
}
200+
201+
protected void doCompleteMultipart(S3BlobStore blobStore, String bucketName, String blobName, String uploadId, List<PartETag> parts)
202+
throws AmazonS3Exception {
203+
CompleteMultipartUploadRequest request = new CompleteMultipartUploadRequest(bucketName, blobName, uploadId, parts);
204+
blobStore.client().completeMultipartUpload(request);
205+
}
206+
207+
private void abortMultipart() {
208+
if (multipartId != null) {
209+
try {
210+
doAbortMultipart(getBlobStore(), getBucketName(), getBlobName(), multipartId);
211+
} finally {
212+
multipartId = null;
213+
}
214+
}
215+
}
216+
217+
protected void doAbortMultipart(S3BlobStore blobStore, String bucketName, String blobName, String uploadId)
218+
throws AmazonS3Exception {
219+
blobStore.client().abortMultipartUpload(new AbortMultipartUploadRequest(bucketName, blobName, uploadId));
220+
}
221+
222+
protected boolean shouldRetry(AmazonS3Exception e) {
223+
return e.getStatusCode() == 400 && "RequestTimeout".equals(e.getErrorCode());
224+
}
225+
}

src/main/java/org/elasticsearch/cloud/aws/blobstore/AbstractS3BlobContainer.java renamed to src/main/java/org/elasticsearch/cloud/aws/blobstore/S3BlobContainer.java

Lines changed: 18 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,6 @@
2323
import com.amazonaws.services.s3.model.ObjectListing;
2424
import com.amazonaws.services.s3.model.S3Object;
2525
import com.amazonaws.services.s3.model.S3ObjectSummary;
26-
import org.apache.lucene.util.IOUtils;
2726
import org.elasticsearch.common.Nullable;
2827
import org.elasticsearch.common.blobstore.BlobMetaData;
2928
import org.elasticsearch.common.blobstore.BlobPath;
@@ -35,17 +34,18 @@
3534
import java.io.FileNotFoundException;
3635
import java.io.IOException;
3736
import java.io.InputStream;
37+
import java.io.OutputStream;
3838

3939
/**
4040
*
4141
*/
42-
public class AbstractS3BlobContainer extends AbstractBlobContainer {
42+
public class S3BlobContainer extends AbstractBlobContainer {
4343

4444
protected final S3BlobStore blobStore;
4545

4646
protected final String keyPath;
4747

48-
public AbstractS3BlobContainer(BlobPath path, S3BlobStore blobStore) {
48+
public S3BlobContainer(BlobPath path, S3BlobStore blobStore) {
4949
super(path);
5050
this.blobStore = blobStore;
5151
String keyPath = path.buildAsString("/");
@@ -74,38 +74,22 @@ public boolean deleteBlob(String blobName) throws IOException {
7474
}
7575

7676
@Override
77-
public void readBlob(final String blobName, final ReadBlobListener listener) {
78-
blobStore.executor().execute(new Runnable() {
79-
@Override
80-
public void run() {
81-
InputStream is;
82-
try {
83-
S3Object object = blobStore.client().getObject(blobStore.bucket(), buildKey(blobName));
84-
is = object.getObjectContent();
85-
} catch (AmazonS3Exception e) {
86-
if (e.getStatusCode() == 404) {
87-
listener.onFailure(new FileNotFoundException(e.getMessage()));
88-
} else {
89-
listener.onFailure(e);
90-
}
91-
return;
92-
} catch (Throwable e) {
93-
listener.onFailure(e);
94-
return;
95-
}
96-
byte[] buffer = new byte[blobStore.bufferSizeInBytes()];
97-
try {
98-
int bytesRead;
99-
while ((bytesRead = is.read(buffer)) != -1) {
100-
listener.onPartial(buffer, 0, bytesRead);
101-
}
102-
listener.onCompleted();
103-
} catch (Throwable e) {
104-
IOUtils.closeWhileHandlingException(is);
105-
listener.onFailure(e);
106-
}
77+
public InputStream openInput(String blobName) throws IOException {
78+
try {
79+
S3Object s3Object = blobStore.client().getObject(blobStore.bucket(), buildKey(blobName));
80+
return s3Object.getObjectContent();
81+
} catch (AmazonS3Exception e) {
82+
if (e.getStatusCode() == 404) {
83+
throw new FileNotFoundException(e.getMessage());
10784
}
108-
});
85+
throw e;
86+
}
87+
}
88+
89+
@Override
90+
public OutputStream createOutput(final String blobName) throws IOException {
91+
// UploadS3OutputStream does buffering internally
92+
return new DefaultS3OutputStream(blobStore, blobStore.bucket(), buildKey(blobName), blobStore.bufferSizeInBytes(), blobStore.numberOfRetries(), blobStore.serverSideEncryption());
10993
}
11094

11195
@Override
@@ -145,8 +129,4 @@ protected String buildKey(String blobName) {
145129
return keyPath + blobName;
146130
}
147131

148-
protected boolean shouldRetry(AmazonS3Exception e) {
149-
return e.getStatusCode() == 400 && "RequestTimeout".equals(e.getErrorCode());
150-
}
151-
152132
}

0 commit comments

Comments
 (0)