Skip to content

Commit 2203f43

Browse files
committed
Merge branch 'master' into feature/per-repo-endpoint
Conflicts: src/main/java/org/elasticsearch/cloud/aws/AwsS3Service.java src/main/java/org/elasticsearch/cloud/aws/InternalAwsS3Service.java src/main/java/org/elasticsearch/repositories/s3/S3Repository.java
2 parents 0662940 + c8839ee commit 2203f43

File tree

12 files changed

+155
-52
lines changed

12 files changed

+155
-52
lines changed

README.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ for the unicast discovery mechanism and add S3 repositories.
77
In order to install the plugin, run:
88

99
```sh
10-
bin/plugin -install elasticsearch/elasticsearch-cloud-aws/2.4.0
10+
bin/plugin install elasticsearch/elasticsearch-cloud-aws/2.4.0
1111
```
1212

1313
You need to install a version matching your Elasticsearch version:

pom.xml

Lines changed: 14 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,8 @@
3333

3434
<properties>
3535
<elasticsearch.version>2.0.0-SNAPSHOT</elasticsearch.version>
36-
<lucene.version>4.10.1</lucene.version>
36+
<lucene.version>5.0.0</lucene.version>
37+
<lucene.maven.version>5.0.0-snapshot-1637347</lucene.maven.version>
3738
<amazonaws.version>1.7.13</amazonaws.version>
3839
<tests.output>onerror</tests.output>
3940
<tests.shuffle>true</tests.shuffle>
@@ -44,6 +45,10 @@
4445
</properties>
4546

4647
<repositories>
48+
<repository>
49+
<id>Lucene snapshots</id>
50+
<url>https://download.elasticsearch.org/lucenesnapshots/1637347/</url>
51+
</repository>
4752
<repository>
4853
<id>sonatype</id>
4954
<url>http://oss.sonatype.org/content/repositories/releases/</url>
@@ -63,10 +68,16 @@
6368
<version>1.3.RC2</version>
6469
<scope>test</scope>
6570
</dependency>
71+
<dependency>
72+
<groupId>com.carrotsearch.randomizedtesting</groupId>
73+
<artifactId>randomizedtesting-runner</artifactId>
74+
<version>2.1.10</version>
75+
<scope>test</scope>
76+
</dependency>
6677
<dependency>
6778
<groupId>org.apache.lucene</groupId>
6879
<artifactId>lucene-test-framework</artifactId>
69-
<version>${lucene.version}</version>
80+
<version>${lucene.maven.version}</version>
7081
<scope>test</scope>
7182
</dependency>
7283

@@ -79,7 +90,7 @@
7990
<dependency>
8091
<groupId>org.apache.lucene</groupId>
8192
<artifactId>lucene-core</artifactId>
82-
<version>${lucene.version}</version>
93+
<version>${lucene.maven.version}</version>
8394
<scope>provided</scope>
8495
</dependency>
8596

src/main/java/org/elasticsearch/cloud/aws/AwsS3Service.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,4 +29,6 @@ public interface AwsS3Service extends LifecycleComponent<AwsS3Service> {
2929
AmazonS3 client();
3030

3131
AmazonS3 client(String endpoint, String region, String account, String key);
32+
33+
AmazonS3 client(String endpoint, String region, String account, String key, Integer maxRetries);
3234
}

src/main/java/org/elasticsearch/cloud/aws/InternalAwsS3Service.java

Lines changed: 15 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -60,14 +60,20 @@ public synchronized AmazonS3 client() {
6060
String account = componentSettings.get("access_key", settings.get("cloud.account"));
6161
String key = componentSettings.get("secret_key", settings.get("cloud.key"));
6262

63-
return getClient(endpoint, account, key);
63+
return getClient(endpoint, account, key, null);
6464
}
6565

6666
@Override
67-
public synchronized AmazonS3 client(String endpoint, String region, String account, String key) {
67+
public AmazonS3 client(String endpoint, String region, String account, String key) {
68+
return client(endpoint, region, account, key, null);
69+
}
70+
71+
@Override
72+
public synchronized AmazonS3 client(String endpoint, String region, String account, String key, Integer maxRetries) {
6873
if (endpoint == null) {
6974
endpoint = getDefaultEndpoint();
7075
}
76+
7177
if (region != null) {
7278
endpoint = getEndpoint(region);
7379
logger.debug("using s3 region [{}], with endpoint [{}]", region, endpoint);
@@ -77,11 +83,11 @@ public synchronized AmazonS3 client(String endpoint, String region, String accou
7783
key = componentSettings.get("secret_key", settings.get("cloud.key"));
7884
}
7985

80-
return getClient(endpoint, account, key);
86+
return getClient(endpoint, account, key, maxRetries);
8187
}
8288

8389

84-
private synchronized AmazonS3 getClient(String endpoint, String account, String key) {
90+
private synchronized AmazonS3 getClient(String endpoint, String account, String key, Integer maxRetries) {
8591
Tuple<String, String> clientDescriptor = new Tuple<String, String>(endpoint, account);
8692
AmazonS3Client client = clients.get(clientDescriptor);
8793
if (client != null) {
@@ -111,6 +117,11 @@ private synchronized AmazonS3 getClient(String endpoint, String account, String
111117
clientConfiguration.withProxyHost(proxyHost).setProxyPort(proxyPort);
112118
}
113119

120+
if (maxRetries != null) {
121+
// If not explicitly set, default to 3 with exponential backoff policy
122+
clientConfiguration.setMaxErrorRetry(maxRetries);
123+
}
124+
114125
AWSCredentialsProvider credentials;
115126

116127
if (account == null && key == null) {

src/main/java/org/elasticsearch/cloud/aws/blobstore/DefaultS3OutputStream.java

Lines changed: 24 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919

2020
package org.elasticsearch.cloud.aws.blobstore;
2121

22+
import com.amazonaws.AmazonClientException;
2223
import com.amazonaws.services.s3.model.*;
2324
import org.elasticsearch.common.unit.ByteSizeUnit;
2425
import org.elasticsearch.common.unit.ByteSizeValue;
@@ -96,12 +97,12 @@ public void flush(byte[] bytes, int off, int len, boolean closing) throws IOExce
9697
private void upload(byte[] bytes, int off, int len) throws IOException {
9798
try (ByteArrayInputStream is = new ByteArrayInputStream(bytes, off, len)) {
9899
int retry = 0;
99-
while (retry < getNumberOfRetries()) {
100+
while (retry <= getNumberOfRetries()) {
100101
try {
101102
doUpload(getBlobStore(), getBucketName(), getBlobName(), is, len, isServerSideEncryption());
102103
break;
103-
} catch (AmazonS3Exception e) {
104-
if (shouldRetry(e)) {
104+
} catch (AmazonClientException e) {
105+
if (getBlobStore().shouldRetry(e) && retry < getNumberOfRetries()) {
105106
is.reset();
106107
retry++;
107108
} else {
@@ -123,11 +124,20 @@ protected void doUpload(S3BlobStore blobStore, String bucketName, String blobNam
123124
}
124125

125126
private void initializeMultipart() {
126-
if (multipartId == null) {
127-
multipartId = doInitialize(getBlobStore(), getBucketName(), getBlobName(), isServerSideEncryption());
128-
if (multipartId != null) {
129-
multipartChunks = 1;
130-
multiparts = new ArrayList<>();
127+
int retry = 0;
128+
while ((retry <= getNumberOfRetries()) && (multipartId == null)) {
129+
try {
130+
multipartId = doInitialize(getBlobStore(), getBucketName(), getBlobName(), isServerSideEncryption());
131+
if (multipartId != null) {
132+
multipartChunks = 1;
133+
multiparts = new ArrayList<>();
134+
}
135+
} catch (AmazonClientException e) {
136+
if (getBlobStore().shouldRetry(e) && retry < getNumberOfRetries()) {
137+
retry++;
138+
} else {
139+
throw e;
140+
}
131141
}
132142
}
133143
}
@@ -145,14 +155,14 @@ protected String doInitialize(S3BlobStore blobStore, String bucketName, String b
145155
private void uploadMultipart(byte[] bytes, int off, int len, boolean lastPart) throws IOException {
146156
try (ByteArrayInputStream is = new ByteArrayInputStream(bytes, off, len)) {
147157
int retry = 0;
148-
while (retry < getNumberOfRetries()) {
158+
while (retry <= getNumberOfRetries()) {
149159
try {
150160
PartETag partETag = doUploadMultipart(getBlobStore(), getBucketName(), getBlobName(), multipartId, is, len, lastPart);
151161
multiparts.add(partETag);
152162
multipartChunks++;
153163
return;
154-
} catch (AmazonS3Exception e) {
155-
if (shouldRetry(e) && retry < getNumberOfRetries()) {
164+
} catch (AmazonClientException e) {
165+
if (getBlobStore().shouldRetry(e) && retry < getNumberOfRetries()) {
156166
is.reset();
157167
retry++;
158168
} else {
@@ -182,13 +192,13 @@ protected PartETag doUploadMultipart(S3BlobStore blobStore, String bucketName, S
182192

183193
private void completeMultipart() {
184194
int retry = 0;
185-
while (retry < getNumberOfRetries()) {
195+
while (retry <= getNumberOfRetries()) {
186196
try {
187197
doCompleteMultipart(getBlobStore(), getBucketName(), getBlobName(), multipartId, multiparts);
188198
multipartId = null;
189199
return;
190-
} catch (AmazonS3Exception e) {
191-
if (shouldRetry(e) && retry < getNumberOfRetries()) {
200+
} catch (AmazonClientException e) {
201+
if (getBlobStore().shouldRetry(e) && retry < getNumberOfRetries()) {
192202
retry++;
193203
} else {
194204
abortMultipart();
@@ -218,8 +228,4 @@ protected void doAbortMultipart(S3BlobStore blobStore, String bucketName, String
218228
throws AmazonS3Exception {
219229
blobStore.client().abortMultipartUpload(new AbortMultipartUploadRequest(bucketName, blobName, uploadId));
220230
}
221-
222-
protected boolean shouldRetry(AmazonS3Exception e) {
223-
return e.getStatusCode() == 400 && "RequestTimeout".equals(e.getErrorCode());
224-
}
225231
}

src/main/java/org/elasticsearch/cloud/aws/blobstore/S3BlobContainer.java

Lines changed: 25 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919

2020
package org.elasticsearch.cloud.aws.blobstore;
2121

22+
import com.amazonaws.AmazonClientException;
2223
import com.amazonaws.services.s3.model.AmazonS3Exception;
2324
import com.amazonaws.services.s3.model.ObjectListing;
2425
import com.amazonaws.services.s3.model.S3Object;
@@ -68,27 +69,40 @@ public boolean blobExists(String blobName) {
6869
}
6970

7071
@Override
71-
public boolean deleteBlob(String blobName) throws IOException {
72-
blobStore.client().deleteObject(blobStore.bucket(), buildKey(blobName));
73-
return true;
72+
public void deleteBlob(String blobName) throws IOException {
73+
try {
74+
blobStore.client().deleteObject(blobStore.bucket(), buildKey(blobName));
75+
} catch (AmazonClientException e) {
76+
throw new IOException("Exception when deleting blob [" + blobName + "]", e);
77+
}
7478
}
7579

7680
@Override
7781
public InputStream openInput(String blobName) throws IOException {
78-
try {
79-
S3Object s3Object = blobStore.client().getObject(blobStore.bucket(), buildKey(blobName));
80-
return s3Object.getObjectContent();
81-
} catch (AmazonS3Exception e) {
82-
if (e.getStatusCode() == 404) {
83-
throw new FileNotFoundException(e.getMessage());
82+
int retry = 0;
83+
while (retry <= blobStore.numberOfRetries()) {
84+
try {
85+
S3Object s3Object = blobStore.client().getObject(blobStore.bucket(), buildKey(blobName));
86+
return s3Object.getObjectContent();
87+
} catch (AmazonClientException e) {
88+
if (blobStore.shouldRetry(e) && (retry < blobStore.numberOfRetries())) {
89+
retry++;
90+
} else {
91+
if (e instanceof AmazonS3Exception) {
92+
if (404 == ((AmazonS3Exception) e).getStatusCode()) {
93+
throw new FileNotFoundException("Blob object [" + blobName + "] not found: " + e.getMessage());
94+
}
95+
}
96+
throw e;
97+
}
8498
}
85-
throw e;
8699
}
100+
throw new BlobStoreException("retries exhausted while attempting to access blob object [name:" + blobName + ", bucket:" + blobStore.bucket() +"]");
87101
}
88102

89103
@Override
90104
public OutputStream createOutput(final String blobName) throws IOException {
91-
// UploadS3OutputStream does buffering internally
105+
// UploadS3OutputStream does buffering & retry logic internally
92106
return new DefaultS3OutputStream(blobStore, blobStore.bucket(), buildKey(blobName), blobStore.bufferSizeInBytes(), blobStore.numberOfRetries(), blobStore.serverSideEncryption());
93107
}
94108

src/main/java/org/elasticsearch/cloud/aws/blobstore/S3BlobStore.java

Lines changed: 15 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,9 @@
1919

2020
package org.elasticsearch.cloud.aws.blobstore;
2121

22+
import com.amazonaws.AmazonClientException;
2223
import com.amazonaws.services.s3.AmazonS3;
24+
import com.amazonaws.services.s3.model.AmazonS3Exception;
2325
import com.amazonaws.services.s3.model.DeleteObjectsRequest;
2426
import com.amazonaws.services.s3.model.DeleteObjectsRequest.KeyVersion;
2527
import com.amazonaws.services.s3.model.ObjectListing;
@@ -55,12 +57,8 @@ public class S3BlobStore extends AbstractComponent implements BlobStore {
5557

5658
private final int numberOfRetries;
5759

58-
59-
public S3BlobStore(Settings settings, AmazonS3 client, String bucket, String region, boolean serverSideEncryption) {
60-
this(settings, client, bucket, region, serverSideEncryption, null);
61-
}
62-
63-
public S3BlobStore(Settings settings, AmazonS3 client, String bucket, @Nullable String region, boolean serverSideEncryption, ByteSizeValue bufferSize) {
60+
public S3BlobStore(Settings settings, AmazonS3 client, String bucket, @Nullable String region, boolean serverSideEncryption,
61+
ByteSizeValue bufferSize, int maxRetries) {
6462
super(settings);
6563
this.client = client;
6664
this.bucket = bucket;
@@ -72,7 +70,7 @@ public S3BlobStore(Settings settings, AmazonS3 client, String bucket, @Nullable
7270
throw new BlobStoreException("\"Detected a buffer_size for the S3 storage lower than [" + MIN_BUFFER_SIZE + "]");
7371
}
7472

75-
this.numberOfRetries = settings.getAsInt("max_retries", 3);
73+
this.numberOfRetries = maxRetries;
7674
if (!client.doesBucketExist(bucket)) {
7775
if (region != null) {
7876
client.createBucket(bucket, region);
@@ -152,6 +150,16 @@ public void delete(BlobPath path) {
152150
}
153151
}
154152

153+
protected boolean shouldRetry(AmazonClientException e) {
154+
if (e instanceof AmazonS3Exception) {
155+
AmazonS3Exception s3e = (AmazonS3Exception)e;
156+
if (s3e.getStatusCode() == 400 && "RequestTimeout".equals(s3e.getErrorCode())) {
157+
return true;
158+
}
159+
}
160+
return e.isRetryable();
161+
}
162+
155163
@Override
156164
public void close() {
157165
}

src/main/java/org/elasticsearch/discovery/ec2/Ec2Discovery.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import org.elasticsearch.cluster.ClusterName;
2424
import org.elasticsearch.cluster.ClusterService;
2525
import org.elasticsearch.cluster.node.DiscoveryNodeService;
26+
import org.elasticsearch.cluster.settings.DynamicSettings;
2627
import org.elasticsearch.common.collect.ImmutableList;
2728
import org.elasticsearch.common.inject.Inject;
2829
import org.elasticsearch.common.settings.Settings;
@@ -45,9 +46,9 @@ public class Ec2Discovery extends ZenDiscovery {
4546
public Ec2Discovery(Settings settings, ClusterName clusterName, ThreadPool threadPool, TransportService transportService,
4647
ClusterService clusterService, NodeSettingsService nodeSettingsService, ZenPingService pingService,
4748
DiscoveryNodeService discoveryNodeService, AwsEc2Service ec2Service, DiscoverySettings discoverySettings,
48-
ElectMasterService electMasterService) {
49+
ElectMasterService electMasterService, DynamicSettings dynamicSettings) {
4950
super(settings, clusterName, threadPool, transportService, clusterService, nodeSettingsService,
50-
discoveryNodeService, pingService, electMasterService, discoverySettings);
51+
discoveryNodeService, pingService, electMasterService, discoverySettings, dynamicSettings);
5152
if (settings.getAsBoolean("cloud.enabled", true)) {
5253
ImmutableList<? extends ZenPing> zenPings = pingService.zenPings();
5354
UnicastZenPing unicastZenPing = null;

src/main/java/org/elasticsearch/repositories/s3/S3Repository.java

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -122,10 +122,14 @@ public S3Repository(RepositoryName name, RepositorySettings repositorySettings,
122122

123123
boolean serverSideEncryption = repositorySettings.settings().getAsBoolean("server_side_encryption", componentSettings.getAsBoolean("server_side_encryption", false));
124124
ByteSizeValue bufferSize = repositorySettings.settings().getAsBytesSize("buffer_size", componentSettings.getAsBytesSize("buffer_size", null));
125-
logger.debug("using bucket [{}], region [{}], endpoint [{}], chunk_size [{}], server_side_encryption [{}], buffer_size [{}]", bucket, region, endpoint, chunkSize, serverSideEncryption, bufferSize);
126-
blobStore = new S3BlobStore(settings, s3Service.client(endpoint, region, repositorySettings.settings().get("access_key"), repositorySettings.settings().get("secret_key")), bucket, region, serverSideEncryption, bufferSize);
125+
Integer maxRetries = repositorySettings.settings().getAsInt("max_retries", componentSettings.getAsInt("max_retries", 3));
127126
this.chunkSize = repositorySettings.settings().getAsBytesSize("chunk_size", componentSettings.getAsBytesSize("chunk_size", new ByteSizeValue(100, ByteSizeUnit.MB)));
128127
this.compress = repositorySettings.settings().getAsBoolean("compress", componentSettings.getAsBoolean("compress", false));
128+
129+
logger.debug("using bucket [{}], region [{}], endpoint [{}], chunk_size [{}], server_side_encryption [{}], buffer_size [{}], max_retries [{}]",
130+
bucket, region, endpoint, chunkSize, serverSideEncryption, bufferSize, maxRetries);
131+
132+
blobStore = new S3BlobStore(settings, s3Service.client(endpoint, region, repositorySettings.settings().get("access_key"), repositorySettings.settings().get("secret_key"), maxRetries), bucket, region, serverSideEncryption, bufferSize, maxRetries);
129133
String basePath = repositorySettings.settings().get("base_path", null);
130134
if (Strings.hasLength(basePath)) {
131135
BlobPath path = new BlobPath();

src/test/java/org/elasticsearch/cloud/aws/AbstractAwsTest.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,8 @@ protected Settings nodeSettings(int nodeOrdinal) {
6060
.put("plugins." + PluginsService.LOAD_PLUGIN_FROM_CLASSPATH, true)
6161
.put(AwsModule.S3_SERVICE_TYPE_KEY, TestAwsS3Service.class)
6262
.put("cloud.aws.test.random", randomInt())
63-
.put("cloud.aws.test.write_failures", 0.1);
63+
.put("cloud.aws.test.write_failures", 0.1)
64+
.put("cloud.aws.test.read_failures", 0.1);
6465

6566
Environment environment = new Environment();
6667

0 commit comments

Comments
 (0)