From 4f896d04a54cfc7f273833adfe415819d3231089 Mon Sep 17 00:00:00 2001 From: Tanguy Leroux Date: Wed, 13 Feb 2019 15:29:46 +0100 Subject: [PATCH 1/6] Wait for active shards when closing indices --- .../rest-api-spec/api/indices.close.json | 4 + .../test/cat.indices/10_basic.yml | 75 +++++++++++- .../test/indices.open/10_basic.yml | 25 +++- .../test/indices.open/20_multiple_indices.yml | 6 + .../CloseIndexClusterStateUpdateRequest.java | 18 ++- .../indices/close/CloseIndexRequest.java | 20 +++ .../close/CloseIndexRequestBuilder.java | 28 +++++ .../close/TransportCloseIndexAction.java | 1 + .../metadata/MetaDataIndexStateService.java | 21 +++- .../admin/indices/RestCloseIndexAction.java | 5 + .../indices/close/CloseIndexRequestTests.java | 114 ++++++++++++++++++ .../index/shard/IndexShardIT.java | 4 +- .../indices/IndicesLifecycleListenerIT.java | 10 +- .../indices/state/CloseIndexIT.java | 5 +- .../indices/state/SimpleIndexStateIT.java | 2 +- .../xpack/ccr/CloseFollowerIndexIT.java | 1 + 16 files changed, 328 insertions(+), 11 deletions(-) create mode 100644 server/src/test/java/org/elasticsearch/action/admin/indices/close/CloseIndexRequestTests.java diff --git a/rest-api-spec/src/main/resources/rest-api-spec/api/indices.close.json b/rest-api-spec/src/main/resources/rest-api-spec/api/indices.close.json index 4eaa93030ee7b..55fd245f26c91 100644 --- a/rest-api-spec/src/main/resources/rest-api-spec/api/indices.close.json +++ b/rest-api-spec/src/main/resources/rest-api-spec/api/indices.close.json @@ -34,6 +34,10 @@ "options" : ["open","closed","none","all"], "default" : "open", "description" : "Whether to expand wildcard expression to concrete indices that are open, closed or both." + }, + "wait_for_active_shards": { + "type" : "string", + "description" : "Sets the number of active shards to wait for before the operation returns." } } }, diff --git a/rest-api-spec/src/main/resources/rest-api-spec/test/cat.indices/10_basic.yml b/rest-api-spec/src/main/resources/rest-api-spec/test/cat.indices/10_basic.yml index c7eddf42d1b03..7bb503d976d11 100644 --- a/rest-api-spec/src/main/resources/rest-api-spec/test/cat.indices/10_basic.yml +++ b/rest-api-spec/src/main/resources/rest-api-spec/test/cat.indices/10_basic.yml @@ -1,5 +1,5 @@ --- -"Test cat indices output": +"Test cat indices output (no indices)": - do: cat.indices: {} @@ -8,6 +8,9 @@ $body: | /^$/ +--- +"Test cat indices output": + - do: indices.create: index: index1 @@ -47,9 +50,26 @@ (\d\d\d\d\-\d\d\-\d\dT\d\d:\d\d:\d\d.\d\d\dZ) \s* ) $/ + +--- +"Test cat indices output for closed index (pre 8.0.0)": + + - skip: + version: "8.0.0 - " + reason: "closed indices are replicated starting version 8.0" + + - do: + indices.create: + index: index1 + body: + settings: + number_of_shards: "1" + number_of_replicas: "0" + - do: indices.close: index: index1 + - is_true: acknowledged - do: cat.indices: @@ -70,6 +90,59 @@ ) $/ +--- +"Test cat indices output for closed index": + + - skip: + version: " - 7.99.99" + reason: "closed indices are replicated starting version 8.0" + + - do: + indices.create: + index: index1 + body: + settings: + number_of_shards: "1" + number_of_replicas: "0" + + - do: + bulk: + body: + - '{"index": {"_index": "index1"}}' + - '{"f1": "1"}' + - '{"index": {"_index": "index1"}}' + - '{"f1": "2"}' + - '{"index": {"_index": "index1"}}' + - '{"f1": "3"}' + - '{"index": {"_index": "index1"}}' + - '{"f1": "4"}' + - '{"index": {"_index": "index1"}}' + - '{"f1": "5"}' + + - do: + indices.close: + index: index1 + - is_true: acknowledged + + - do: + cat.indices: + index: index* + + - match: + $body: | + /^( \s+ + close \s+ + index1 \s+ + ([a-zA-Z0-9=/_+]|[\\\-]){22} \s+ + \s+ + \s+ + 5 \s+ + 0 \s+ + (\d+|\d+[.]\d+)(kb|b) \s+ + (\d+|\d+[.]\d+)(kb|b) \s* + ) + $/ + --- "Test cat indices using health status": diff --git a/rest-api-spec/src/main/resources/rest-api-spec/test/indices.open/10_basic.yml b/rest-api-spec/src/main/resources/rest-api-spec/test/indices.open/10_basic.yml index 64e59d5939287..fcb49acfddf1d 100644 --- a/rest-api-spec/src/main/resources/rest-api-spec/test/indices.open/10_basic.yml +++ b/rest-api-spec/src/main/resources/rest-api-spec/test/indices.open/10_basic.yml @@ -14,6 +14,7 @@ - do: indices.close: index: test_index + - is_true: acknowledged - do: catch: bad_request @@ -24,6 +25,7 @@ - do: indices.open: index: test_index + - is_true: acknowledged - do: cluster.health: @@ -50,11 +52,32 @@ - do: indices.close: index: test_index + - is_true: acknowledged - do: indices.open: index: test_index wait_for_active_shards: all - + - is_true: acknowledged - match: { acknowledged: true } - match: { shards_acknowledged: true } + +--- +"Close index with wait_for_active_shards set to all": + - skip: + version: " - 7.99.99" + reason: "closed indices are replicated starting version 8.0" + + - do: + indices.create: + index: test_index + body: + settings: + number_of_replicas: 0 + + - do: + indices.close: + index: test_index + wait_for_active_shards: all + - is_true: acknowledged + - match: { acknowledged: true } diff --git a/rest-api-spec/src/main/resources/rest-api-spec/test/indices.open/20_multiple_indices.yml b/rest-api-spec/src/main/resources/rest-api-spec/test/indices.open/20_multiple_indices.yml index 8e1bf660f6378..bef5ea8a54651 100644 --- a/rest-api-spec/src/main/resources/rest-api-spec/test/indices.open/20_multiple_indices.yml +++ b/rest-api-spec/src/main/resources/rest-api-spec/test/indices.open/20_multiple_indices.yml @@ -26,6 +26,7 @@ setup: - do: indices.close: index: _all + - is_true: acknowledged - do: catch: bad_request @@ -36,6 +37,7 @@ setup: - do: indices.open: index: _all + - is_true: acknowledged - do: cluster.health: @@ -51,6 +53,7 @@ setup: - do: indices.close: index: test_* + - is_true: acknowledged - do: catch: bad_request @@ -61,6 +64,7 @@ setup: - do: indices.open: index: test_* + - is_true: acknowledged - do: cluster.health: @@ -76,6 +80,7 @@ setup: - do: indices.close: index: '*' + - is_true: acknowledged - do: catch: bad_request @@ -86,6 +91,7 @@ setup: - do: indices.open: index: '*' + - is_true: acknowledged - do: cluster.health: diff --git a/server/src/main/java/org/elasticsearch/action/admin/indices/close/CloseIndexClusterStateUpdateRequest.java b/server/src/main/java/org/elasticsearch/action/admin/indices/close/CloseIndexClusterStateUpdateRequest.java index bb0f98ac07b7e..955ddf6fe8a76 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/indices/close/CloseIndexClusterStateUpdateRequest.java +++ b/server/src/main/java/org/elasticsearch/action/admin/indices/close/CloseIndexClusterStateUpdateRequest.java @@ -18,6 +18,7 @@ */ package org.elasticsearch.action.admin.indices.close; +import org.elasticsearch.action.support.ActiveShardCount; import org.elasticsearch.cluster.ack.IndicesClusterStateUpdateRequest; /** @@ -25,7 +26,8 @@ */ public class CloseIndexClusterStateUpdateRequest extends IndicesClusterStateUpdateRequest { - private final long taskId; + private long taskId; + private ActiveShardCount waitForActiveShards = ActiveShardCount.DEFAULT; public CloseIndexClusterStateUpdateRequest(final long taskId) { this.taskId = taskId; @@ -34,4 +36,18 @@ public CloseIndexClusterStateUpdateRequest(final long taskId) { public long taskId() { return taskId; } + + public CloseIndexClusterStateUpdateRequest taskId(final long taskId) { + this.taskId = taskId; + return this; + } + + public ActiveShardCount waitForActiveShards() { + return waitForActiveShards; + } + + public CloseIndexClusterStateUpdateRequest waitForActiveShards(final ActiveShardCount waitForActiveShards) { + this.waitForActiveShards = waitForActiveShards; + return this; + } } diff --git a/server/src/main/java/org/elasticsearch/action/admin/indices/close/CloseIndexRequest.java b/server/src/main/java/org/elasticsearch/action/admin/indices/close/CloseIndexRequest.java index 272bae9425712..230091ecab60e 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/indices/close/CloseIndexRequest.java +++ b/server/src/main/java/org/elasticsearch/action/admin/indices/close/CloseIndexRequest.java @@ -19,8 +19,10 @@ package org.elasticsearch.action.admin.indices.close; +import org.elasticsearch.Version; import org.elasticsearch.action.ActionRequestValidationException; import org.elasticsearch.action.IndicesRequest; +import org.elasticsearch.action.support.ActiveShardCount; import org.elasticsearch.action.support.IndicesOptions; import org.elasticsearch.action.support.master.AcknowledgedRequest; import org.elasticsearch.common.io.stream.StreamInput; @@ -38,6 +40,7 @@ public class CloseIndexRequest extends AcknowledgedRequest im private String[] indices; private IndicesOptions indicesOptions = IndicesOptions.strictExpandOpen(); + private ActiveShardCount waitForActiveShards = ActiveShardCount.DEFAULT; public CloseIndexRequest() { } @@ -101,11 +104,25 @@ public CloseIndexRequest indicesOptions(IndicesOptions indicesOptions) { return this; } + public ActiveShardCount waitForActiveShards() { + return waitForActiveShards; + } + + public CloseIndexRequest waitForActiveShards(final ActiveShardCount waitForActiveShards) { + this.waitForActiveShards = waitForActiveShards; + return this; + } + @Override public void readFrom(StreamInput in) throws IOException { super.readFrom(in); indices = in.readStringArray(); indicesOptions = IndicesOptions.readIndicesOptions(in); + if (in.getVersion().onOrAfter(Version.V_8_0_0)) { + waitForActiveShards = ActiveShardCount.readFrom(in); + } else { + waitForActiveShards = ActiveShardCount.NONE; + } } @Override @@ -113,5 +130,8 @@ public void writeTo(StreamOutput out) throws IOException { super.writeTo(out); out.writeStringArray(indices); indicesOptions.writeIndicesOptions(out); + if (out.getVersion().onOrAfter(Version.V_8_0_0)) { + waitForActiveShards.writeTo(out); + } } } diff --git a/server/src/main/java/org/elasticsearch/action/admin/indices/close/CloseIndexRequestBuilder.java b/server/src/main/java/org/elasticsearch/action/admin/indices/close/CloseIndexRequestBuilder.java index e69c6fed87dcd..243eb1e873ec1 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/indices/close/CloseIndexRequestBuilder.java +++ b/server/src/main/java/org/elasticsearch/action/admin/indices/close/CloseIndexRequestBuilder.java @@ -19,6 +19,7 @@ package org.elasticsearch.action.admin.indices.close; +import org.elasticsearch.action.support.ActiveShardCount; import org.elasticsearch.action.support.IndicesOptions; import org.elasticsearch.action.support.master.AcknowledgedRequestBuilder; import org.elasticsearch.action.support.master.AcknowledgedResponse; @@ -60,4 +61,31 @@ public CloseIndexRequestBuilder setIndicesOptions(IndicesOptions indicesOptions) request.indicesOptions(indicesOptions); return this; } + + /** + * Sets the number of shard copies that should be active for indices closing to return. + * Defaults to {@link ActiveShardCount#DEFAULT}, which will wait for one shard copy + * (the primary) to become active. Set this value to {@link ActiveShardCount#ALL} to + * wait for all shards (primary and all replicas) to be active before returning. + * Otherwise, use {@link ActiveShardCount#from(int)} to set this value to any + * non-negative integer, up to the number of copies per shard (number of replicas + 1), + * to wait for the desired amount of shard copies to become active before returning. + * Indices closing will only wait up until the timeout value for the number of shard copies + * to be active before returning. + * + * @param waitForActiveShards number of active shard copies to wait on + */ + public CloseIndexRequestBuilder setWaitForActiveShards(final ActiveShardCount waitForActiveShards) { + request.waitForActiveShards(waitForActiveShards); + return this; + } + + /** + * A shortcut for {@link #setWaitForActiveShards(ActiveShardCount)} where the numerical + * shard count is passed in, instead of having to first call {@link ActiveShardCount#from(int)} + * to get the ActiveShardCount. + */ + public CloseIndexRequestBuilder setWaitForActiveShards(final int waitForActiveShards) { + return setWaitForActiveShards(ActiveShardCount.from(waitForActiveShards)); + } } diff --git a/server/src/main/java/org/elasticsearch/action/admin/indices/close/TransportCloseIndexAction.java b/server/src/main/java/org/elasticsearch/action/admin/indices/close/TransportCloseIndexAction.java index bb3db084b0c53..dcc9688fab62c 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/indices/close/TransportCloseIndexAction.java +++ b/server/src/main/java/org/elasticsearch/action/admin/indices/close/TransportCloseIndexAction.java @@ -114,6 +114,7 @@ protected void masterOperation(final Task task, final CloseIndexRequest request, final CloseIndexClusterStateUpdateRequest closeRequest = new CloseIndexClusterStateUpdateRequest(task.getId()) .ackTimeout(request.timeout()) .masterNodeTimeout(request.masterNodeTimeout()) + .waitForActiveShards(request.waitForActiveShards()) .indices(concreteIndices); indexStateService.closeIndices(closeRequest, new ActionListener() { diff --git a/server/src/main/java/org/elasticsearch/cluster/metadata/MetaDataIndexStateService.java b/server/src/main/java/org/elasticsearch/cluster/metadata/MetaDataIndexStateService.java index 3e9143320c53c..2dba9b695ac11 100644 --- a/server/src/main/java/org/elasticsearch/cluster/metadata/MetaDataIndexStateService.java +++ b/server/src/main/java/org/elasticsearch/cluster/metadata/MetaDataIndexStateService.java @@ -170,7 +170,26 @@ public void onFailure(final String source, final Exception e) { @Override public void clusterStateProcessed(final String source, final ClusterState oldState, final ClusterState newState) { - listener.onResponse(new AcknowledgedResponse(acknowledged)); + + final AcknowledgedResponse response = new AcknowledgedResponse(acknowledged); + final String[] indices = results.entrySet().stream() + .filter(result -> result.getValue().isAcknowledged()) + .map(result -> result.getKey().getName()) + .filter(index -> newState.routingTable().hasIndex(index)) + .toArray(String[]::new); + + if (indices.length > 0) { + activeShardsObserver.waitForActiveShards(indices, request.waitForActiveShards(), + request.ackTimeout(), shardsAcknowledged -> { + if (shardsAcknowledged == false) { + logger.debug("[{}] indices closed, but the operation timed out while waiting " + + "for enough shards to be started.", Arrays.toString(indices)); + } + listener.onResponse(response); + }, listener::onFailure); + } else { + listener.onResponse(response); + } } }), listener::onFailure) diff --git a/server/src/main/java/org/elasticsearch/rest/action/admin/indices/RestCloseIndexAction.java b/server/src/main/java/org/elasticsearch/rest/action/admin/indices/RestCloseIndexAction.java index b2475cafcbeb6..3ee2687eb7288 100644 --- a/server/src/main/java/org/elasticsearch/rest/action/admin/indices/RestCloseIndexAction.java +++ b/server/src/main/java/org/elasticsearch/rest/action/admin/indices/RestCloseIndexAction.java @@ -20,6 +20,7 @@ package org.elasticsearch.rest.action.admin.indices; import org.elasticsearch.action.admin.indices.close.CloseIndexRequest; +import org.elasticsearch.action.support.ActiveShardCount; import org.elasticsearch.action.support.IndicesOptions; import org.elasticsearch.client.node.NodeClient; import org.elasticsearch.common.Strings; @@ -49,6 +50,10 @@ public RestChannelConsumer prepareRequest(final RestRequest request, final NodeC closeIndexRequest.masterNodeTimeout(request.paramAsTime("master_timeout", closeIndexRequest.masterNodeTimeout())); closeIndexRequest.timeout(request.paramAsTime("timeout", closeIndexRequest.timeout())); closeIndexRequest.indicesOptions(IndicesOptions.fromRequest(request, closeIndexRequest.indicesOptions())); + String waitForActiveShards = request.param("wait_for_active_shards"); + if (waitForActiveShards != null) { + closeIndexRequest.waitForActiveShards(ActiveShardCount.parseString(waitForActiveShards)); + } return channel -> client.admin().indices().close(closeIndexRequest, new RestToXContentListener<>(channel)); } diff --git a/server/src/test/java/org/elasticsearch/action/admin/indices/close/CloseIndexRequestTests.java b/server/src/test/java/org/elasticsearch/action/admin/indices/close/CloseIndexRequestTests.java new file mode 100644 index 0000000000000..53b39027a697b --- /dev/null +++ b/server/src/test/java/org/elasticsearch/action/admin/indices/close/CloseIndexRequestTests.java @@ -0,0 +1,114 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.action.admin.indices.close; + +import org.elasticsearch.Version; +import org.elasticsearch.action.support.ActiveShardCount; +import org.elasticsearch.action.support.IndicesOptions; +import org.elasticsearch.common.io.stream.BytesStreamOutput; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.tasks.TaskId; +import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.test.VersionUtils; + +import static org.elasticsearch.test.VersionUtils.randomVersionBetween; + +public class CloseIndexRequestTests extends ESTestCase { + + public void testSerialization() throws Exception { + final CloseIndexRequest request = randomRequest(); + try (BytesStreamOutput out = new BytesStreamOutput()) { + request.writeTo(out); + + final CloseIndexRequest deserializedRequest = new CloseIndexRequest(); + try (StreamInput in = out.bytes().streamInput()) { + deserializedRequest.readFrom(in); + } + assertEquals(request.timeout(), deserializedRequest.timeout()); + assertEquals(request.masterNodeTimeout(), deserializedRequest.masterNodeTimeout()); + assertEquals(request.indicesOptions(), deserializedRequest.indicesOptions()); + assertEquals(request.getParentTask(), deserializedRequest.getParentTask()); + assertEquals(request.waitForActiveShards(), deserializedRequest.waitForActiveShards()); + assertArrayEquals(request.indices(), deserializedRequest.indices()); + } + } + + public void testBwcSerialization() throws Exception { + { + final CloseIndexRequest request = randomRequest(); + try (BytesStreamOutput out = new BytesStreamOutput()) { + out.setVersion(randomVersionBetween(random(), Version.V_6_4_0, VersionUtils.getPreviousVersion(Version.V_8_0_0))); + request.writeTo(out); + + try (StreamInput in = out.bytes().streamInput()) { + assertEquals(request.getParentTask(), TaskId.readFromStream(in)); + assertEquals(request.masterNodeTimeout(), in.readTimeValue()); + assertEquals(request.timeout(), in.readTimeValue()); + assertArrayEquals(request.indices(), in.readStringArray()); + assertEquals(request.indicesOptions(), IndicesOptions.readIndicesOptions(in)); + } + } + } + { + final CloseIndexRequest sample = randomRequest(); + try (BytesStreamOutput out = new BytesStreamOutput()) { + sample.getParentTask().writeTo(out); + out.writeTimeValue(sample.masterNodeTimeout()); + out.writeTimeValue(sample.timeout()); + out.writeStringArray(sample.indices()); + sample.indicesOptions().writeIndicesOptions(out); + + final CloseIndexRequest deserializedRequest = new CloseIndexRequest(); + try (StreamInput in = out.bytes().streamInput()) { + in.setVersion(randomVersionBetween(random(), Version.V_6_4_0, VersionUtils.getPreviousVersion(Version.V_8_0_0))); + deserializedRequest.readFrom(in); + } + assertEquals(sample.getParentTask(), deserializedRequest.getParentTask()); + assertEquals(sample.masterNodeTimeout(), deserializedRequest.masterNodeTimeout()); + assertEquals(sample.timeout(), deserializedRequest.timeout()); + assertArrayEquals(sample.indices(), deserializedRequest.indices()); + assertEquals(sample.indicesOptions(), deserializedRequest.indicesOptions()); + assertEquals(ActiveShardCount.NONE, deserializedRequest.waitForActiveShards()); + } + } + } + + private CloseIndexRequest randomRequest() { + CloseIndexRequest request = new CloseIndexRequest(); + request.indices(generateRandomStringArray(10, 5, false, false)); + if (randomBoolean()) { + request.indicesOptions(IndicesOptions.fromOptions(randomBoolean(), randomBoolean(), randomBoolean(), randomBoolean())); + } + if (randomBoolean()) { + request.timeout(randomPositiveTimeValue()); + } + if (randomBoolean()) { + request.masterNodeTimeout(randomPositiveTimeValue()); + } + if (randomBoolean()) { + request.setParentTask(randomAlphaOfLength(5), randomNonNegativeLong()); + } + if (randomBoolean()) { + request.waitForActiveShards(randomFrom(ActiveShardCount.DEFAULT, ActiveShardCount.NONE, ActiveShardCount.ONE, + ActiveShardCount.ALL)); + } + return request; + } +} diff --git a/server/src/test/java/org/elasticsearch/index/shard/IndexShardIT.java b/server/src/test/java/org/elasticsearch/index/shard/IndexShardIT.java index 5b861e7d52bd5..ca54f56375ee2 100644 --- a/server/src/test/java/org/elasticsearch/index/shard/IndexShardIT.java +++ b/server/src/test/java/org/elasticsearch/index/shard/IndexShardIT.java @@ -908,7 +908,7 @@ public void testShardChangesWithDefaultDocType() throws Exception { * Test that the {@link org.elasticsearch.index.engine.NoOpEngine} takes precedence over other * engine factories if the index is closed. */ - public void testNoOpEngineFactoryTakesPrecedence() throws IOException { + public void testNoOpEngineFactoryTakesPrecedence() { final String indexName = "closed-index"; createIndex(indexName, Settings.builder().put("index.number_of_shards", 1).put("index.number_of_replicas", 0).build()); ensureGreen(); @@ -920,7 +920,7 @@ public void testNoOpEngineFactoryTakesPrecedence() throws IOException { final IndexMetaData indexMetaData = clusterState.metaData().index(indexName); final IndicesService indicesService = getInstanceFromNode(IndicesService.class); - final IndexService indexService = indicesService.createIndex(indexMetaData, Collections.emptyList()); + final IndexService indexService = indicesService.indexServiceSafe(indexMetaData.getIndex()); for (IndexShard indexShard : indexService) { assertThat(indexShard.getEngine(), instanceOf(NoOpEngine.class)); diff --git a/server/src/test/java/org/elasticsearch/indices/IndicesLifecycleListenerIT.java b/server/src/test/java/org/elasticsearch/indices/IndicesLifecycleListenerIT.java index 81cea988cd020..ac83c50fea6ae 100644 --- a/server/src/test/java/org/elasticsearch/indices/IndicesLifecycleListenerIT.java +++ b/server/src/test/java/org/elasticsearch/indices/IndicesLifecycleListenerIT.java @@ -19,6 +19,7 @@ package org.elasticsearch.indices; import org.elasticsearch.ElasticsearchException; +import org.elasticsearch.Version; import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.metadata.IndexMetaData; @@ -211,8 +212,13 @@ public void testIndexStateShardChanged() throws Throwable { assertThat(stateChangeListenerNode1.afterCloseSettings.getAsInt(SETTING_NUMBER_OF_SHARDS, -1), equalTo(6)); assertThat(stateChangeListenerNode1.afterCloseSettings.getAsInt(SETTING_NUMBER_OF_REPLICAS, -1), equalTo(1)); - assertShardStatesMatch(stateChangeListenerNode1, 6, CLOSED); - assertShardStatesMatch(stateChangeListenerNode2, 6, CLOSED); + if (Version.CURRENT.onOrAfter(Version.V_8_0_0)) { + assertShardStatesMatch(stateChangeListenerNode1, 6, CLOSED, CREATED, RECOVERING, POST_RECOVERY, STARTED); + assertShardStatesMatch(stateChangeListenerNode2, 6, CLOSED, CREATED, RECOVERING, POST_RECOVERY, STARTED); + } else { + assertShardStatesMatch(stateChangeListenerNode1, 6, CLOSED); + assertShardStatesMatch(stateChangeListenerNode2, 6, CLOSED); + } } private static void assertShardStatesMatch(final IndexShardStateChangeListener stateChangeListener, diff --git a/server/src/test/java/org/elasticsearch/indices/state/CloseIndexIT.java b/server/src/test/java/org/elasticsearch/indices/state/CloseIndexIT.java index ca3f6e694097d..6da7c93d28895 100644 --- a/server/src/test/java/org/elasticsearch/indices/state/CloseIndexIT.java +++ b/server/src/test/java/org/elasticsearch/indices/state/CloseIndexIT.java @@ -112,7 +112,8 @@ public void testCloseAlreadyClosedIndex() throws Exception { assertIndexIsClosed(indexName); // Second close should be acked too - assertBusy(() -> assertAcked(client().admin().indices().prepareClose(indexName))); + final ActiveShardCount activeShardCount = randomFrom(ActiveShardCount.NONE, ActiveShardCount.DEFAULT, ActiveShardCount.ALL); + assertBusy(() -> assertAcked(client().admin().indices().prepareClose(indexName).setWaitForActiveShards(activeShardCount))); assertIndexIsClosed(indexName); } @@ -126,7 +127,7 @@ public void testCloseUnassignedIndex() throws Exception { assertThat(clusterState.metaData().indices().get(indexName).getState(), is(IndexMetaData.State.OPEN)); assertThat(clusterState.routingTable().allShards().stream().allMatch(ShardRouting::unassigned), is(true)); - assertBusy(() -> assertAcked(client().admin().indices().prepareClose(indexName))); + assertBusy(() -> assertAcked(client().admin().indices().prepareClose(indexName).setWaitForActiveShards(ActiveShardCount.NONE))); assertIndexIsClosed(indexName); } diff --git a/server/src/test/java/org/elasticsearch/indices/state/SimpleIndexStateIT.java b/server/src/test/java/org/elasticsearch/indices/state/SimpleIndexStateIT.java index 050d77a223101..854dba7fb894b 100644 --- a/server/src/test/java/org/elasticsearch/indices/state/SimpleIndexStateIT.java +++ b/server/src/test/java/org/elasticsearch/indices/state/SimpleIndexStateIT.java @@ -102,7 +102,7 @@ public void testFastCloseAfterCreateContinuesCreateAfterOpen() { assertThat(health.isTimedOut(), equalTo(false)); assertThat(health.getStatus(), equalTo(ClusterHealthStatus.RED)); - assertAcked(client().admin().indices().prepareClose("test")); + assertAcked(client().admin().indices().prepareClose("test").setWaitForActiveShards(ActiveShardCount.NONE)); logger.info("--> updating test index settings to allow allocation"); client().admin().indices().prepareUpdateSettings("test").setSettings(Settings.builder() diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/CloseFollowerIndexIT.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/CloseFollowerIndexIT.java index 0551d30c2e73a..bd9f1a13bdb4d 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/CloseFollowerIndexIT.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/CloseFollowerIndexIT.java @@ -31,6 +31,7 @@ public class CloseFollowerIndexIT extends CcrIntegTestCase { + @AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/pull/38767") public void testCloseAndReopenFollowerIndex() throws Exception { final String leaderIndexSettings = getIndexSettings(1, 1, singletonMap(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), "true")); assertAcked(leaderClient().admin().indices().prepareCreate("index1").setSource(leaderIndexSettings, XContentType.JSON)); From b029048f9e6ece9e39cb49d49a495b4b6974ea0f Mon Sep 17 00:00:00 2001 From: Tanguy Leroux Date: Fri, 15 Feb 2019 13:44:24 +0100 Subject: [PATCH 2/6] Improve CloseIndexResponse --- .../admin/indices/close/CloseIndexAction.java | 7 +- .../indices/close/CloseIndexRequest.java | 2 +- .../close/CloseIndexRequestBuilder.java | 3 +- .../indices/close/CloseIndexResponse.java | 52 +++++++++++ .../close/TransportCloseIndexAction.java | 26 +++--- .../client/IndicesAdminClient.java | 5 +- .../client/support/AbstractClient.java | 5 +- .../metadata/MetaDataIndexStateService.java | 14 +-- .../close/CloseIndexResponseTests.java | 86 +++++++++++++++++++ .../indices/state/CloseIndexIT.java | 21 +++++ .../indices/state/ReopenWhileClosingIT.java | 6 +- .../ccr/action/ShardFollowTasksExecutor.java | 3 +- .../action/TransportFreezeIndexAction.java | 5 +- 13 files changed, 201 insertions(+), 34 deletions(-) create mode 100644 server/src/main/java/org/elasticsearch/action/admin/indices/close/CloseIndexResponse.java create mode 100644 server/src/test/java/org/elasticsearch/action/admin/indices/close/CloseIndexResponseTests.java diff --git a/server/src/main/java/org/elasticsearch/action/admin/indices/close/CloseIndexAction.java b/server/src/main/java/org/elasticsearch/action/admin/indices/close/CloseIndexAction.java index 68a911ff58627..5c3d60dd44013 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/indices/close/CloseIndexAction.java +++ b/server/src/main/java/org/elasticsearch/action/admin/indices/close/CloseIndexAction.java @@ -20,9 +20,8 @@ package org.elasticsearch.action.admin.indices.close; import org.elasticsearch.action.Action; -import org.elasticsearch.action.support.master.AcknowledgedResponse; -public class CloseIndexAction extends Action { +public class CloseIndexAction extends Action { public static final CloseIndexAction INSTANCE = new CloseIndexAction(); public static final String NAME = "indices:admin/close"; @@ -32,7 +31,7 @@ private CloseIndexAction() { } @Override - public AcknowledgedResponse newResponse() { - return new AcknowledgedResponse(); + public CloseIndexResponse newResponse() { + return new CloseIndexResponse(); } } diff --git a/server/src/main/java/org/elasticsearch/action/admin/indices/close/CloseIndexRequest.java b/server/src/main/java/org/elasticsearch/action/admin/indices/close/CloseIndexRequest.java index 230091ecab60e..e7979beb68214 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/indices/close/CloseIndexRequest.java +++ b/server/src/main/java/org/elasticsearch/action/admin/indices/close/CloseIndexRequest.java @@ -40,7 +40,7 @@ public class CloseIndexRequest extends AcknowledgedRequest im private String[] indices; private IndicesOptions indicesOptions = IndicesOptions.strictExpandOpen(); - private ActiveShardCount waitForActiveShards = ActiveShardCount.DEFAULT; + private ActiveShardCount waitForActiveShards = ActiveShardCount.DEFAULT; //NORELEASE Changes this to NONE to keep previous behavior public CloseIndexRequest() { } diff --git a/server/src/main/java/org/elasticsearch/action/admin/indices/close/CloseIndexRequestBuilder.java b/server/src/main/java/org/elasticsearch/action/admin/indices/close/CloseIndexRequestBuilder.java index 243eb1e873ec1..7db79e0c3e550 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/indices/close/CloseIndexRequestBuilder.java +++ b/server/src/main/java/org/elasticsearch/action/admin/indices/close/CloseIndexRequestBuilder.java @@ -22,14 +22,13 @@ import org.elasticsearch.action.support.ActiveShardCount; import org.elasticsearch.action.support.IndicesOptions; import org.elasticsearch.action.support.master.AcknowledgedRequestBuilder; -import org.elasticsearch.action.support.master.AcknowledgedResponse; import org.elasticsearch.client.ElasticsearchClient; /** * Builder for close index request */ public class CloseIndexRequestBuilder - extends AcknowledgedRequestBuilder { + extends AcknowledgedRequestBuilder { public CloseIndexRequestBuilder(ElasticsearchClient client, CloseIndexAction action) { super(client, action, new CloseIndexRequest()); diff --git a/server/src/main/java/org/elasticsearch/action/admin/indices/close/CloseIndexResponse.java b/server/src/main/java/org/elasticsearch/action/admin/indices/close/CloseIndexResponse.java new file mode 100644 index 0000000000000..189712f0fca78 --- /dev/null +++ b/server/src/main/java/org/elasticsearch/action/admin/indices/close/CloseIndexResponse.java @@ -0,0 +1,52 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.elasticsearch.action.admin.indices.close; + +import org.elasticsearch.Version; +import org.elasticsearch.action.support.master.ShardsAcknowledgedResponse; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; + +import java.io.IOException; + +public class CloseIndexResponse extends ShardsAcknowledgedResponse { + + CloseIndexResponse() { + } + + public CloseIndexResponse(final boolean acknowledged, final boolean shardsAcknowledged) { + super(acknowledged, shardsAcknowledged); + } + + @Override + public void readFrom(StreamInput in) throws IOException { + super.readFrom(in); + if (in.getVersion().onOrAfter(Version.V_8_0_0)) { + readShardsAcknowledged(in); + } + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + super.writeTo(out); + if (out.getVersion().onOrAfter(Version.V_8_0_0)) { + writeShardsAcknowledged(out); + } + } +} diff --git a/server/src/main/java/org/elasticsearch/action/admin/indices/close/TransportCloseIndexAction.java b/server/src/main/java/org/elasticsearch/action/admin/indices/close/TransportCloseIndexAction.java index dcc9688fab62c..05f680af57ddf 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/indices/close/TransportCloseIndexAction.java +++ b/server/src/main/java/org/elasticsearch/action/admin/indices/close/TransportCloseIndexAction.java @@ -23,7 +23,6 @@ import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.support.ActionFilters; import org.elasticsearch.action.support.DestructiveOperations; -import org.elasticsearch.action.support.master.AcknowledgedResponse; import org.elasticsearch.action.support.master.TransportMasterNodeAction; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.block.ClusterBlockException; @@ -44,7 +43,7 @@ /** * Close index action */ -public class TransportCloseIndexAction extends TransportMasterNodeAction { +public class TransportCloseIndexAction extends TransportMasterNodeAction { private final MetaDataIndexStateService indexStateService; private final DestructiveOperations destructiveOperations; @@ -76,12 +75,12 @@ protected String executor() { } @Override - protected AcknowledgedResponse newResponse() { - return new AcknowledgedResponse(); + protected CloseIndexResponse newResponse() { + return new CloseIndexResponse(); } @Override - protected void doExecute(Task task, CloseIndexRequest request, ActionListener listener) { + protected void doExecute(Task task, CloseIndexRequest request, ActionListener listener) { destructiveOperations.failDestructive(request.indices()); if (closeIndexEnabled == false) { throw new IllegalStateException("closing indices is disabled - set [" + CLUSTER_INDICES_CLOSE_ENABLE_SETTING.getKey() + @@ -97,17 +96,20 @@ protected ClusterBlockException checkBlock(CloseIndexRequest request, ClusterSta } @Override - protected void masterOperation(final CloseIndexRequest request, final ClusterState state, - final ActionListener listener) { + protected void masterOperation(final CloseIndexRequest request, + final ClusterState state, + final ActionListener listener) { throw new UnsupportedOperationException("The task parameter is required"); } @Override - protected void masterOperation(final Task task, final CloseIndexRequest request, final ClusterState state, - final ActionListener listener) throws Exception { + protected void masterOperation(final Task task, + final CloseIndexRequest request, + final ClusterState state, + final ActionListener listener) throws Exception { final Index[] concreteIndices = indexNameExpressionResolver.concreteIndices(state, request); if (concreteIndices == null || concreteIndices.length == 0) { - listener.onResponse(new AcknowledgedResponse(true)); + listener.onResponse(new CloseIndexResponse(true, false)); return; } @@ -117,10 +119,10 @@ protected void masterOperation(final Task task, final CloseIndexRequest request, .waitForActiveShards(request.waitForActiveShards()) .indices(concreteIndices); - indexStateService.closeIndices(closeRequest, new ActionListener() { + indexStateService.closeIndices(closeRequest, new ActionListener() { @Override - public void onResponse(final AcknowledgedResponse response) { + public void onResponse(final CloseIndexResponse response) { listener.onResponse(response); } diff --git a/server/src/main/java/org/elasticsearch/client/IndicesAdminClient.java b/server/src/main/java/org/elasticsearch/client/IndicesAdminClient.java index 718dde98a0f97..d5a73981f29f1 100644 --- a/server/src/main/java/org/elasticsearch/client/IndicesAdminClient.java +++ b/server/src/main/java/org/elasticsearch/client/IndicesAdminClient.java @@ -36,6 +36,7 @@ import org.elasticsearch.action.admin.indices.cache.clear.ClearIndicesCacheResponse; import org.elasticsearch.action.admin.indices.close.CloseIndexRequest; import org.elasticsearch.action.admin.indices.close.CloseIndexRequestBuilder; +import org.elasticsearch.action.admin.indices.close.CloseIndexResponse; import org.elasticsearch.action.admin.indices.create.CreateIndexRequest; import org.elasticsearch.action.admin.indices.create.CreateIndexRequestBuilder; import org.elasticsearch.action.admin.indices.create.CreateIndexResponse; @@ -307,7 +308,7 @@ public interface IndicesAdminClient extends ElasticsearchClient { * @return The result future * @see org.elasticsearch.client.Requests#closeIndexRequest(String) */ - ActionFuture close(CloseIndexRequest request); + ActionFuture close(CloseIndexRequest request); /** * Closes an index based on the index name. @@ -316,7 +317,7 @@ public interface IndicesAdminClient extends ElasticsearchClient { * @param listener A listener to be notified with a result * @see org.elasticsearch.client.Requests#closeIndexRequest(String) */ - void close(CloseIndexRequest request, ActionListener listener); + void close(CloseIndexRequest request, ActionListener listener); /** * Closes one or more indices based on their index name. diff --git a/server/src/main/java/org/elasticsearch/client/support/AbstractClient.java b/server/src/main/java/org/elasticsearch/client/support/AbstractClient.java index 3fc931a85c0f7..e79f0567babe6 100644 --- a/server/src/main/java/org/elasticsearch/client/support/AbstractClient.java +++ b/server/src/main/java/org/elasticsearch/client/support/AbstractClient.java @@ -152,6 +152,7 @@ import org.elasticsearch.action.admin.indices.close.CloseIndexAction; import org.elasticsearch.action.admin.indices.close.CloseIndexRequest; import org.elasticsearch.action.admin.indices.close.CloseIndexRequestBuilder; +import org.elasticsearch.action.admin.indices.close.CloseIndexResponse; import org.elasticsearch.action.admin.indices.create.CreateIndexAction; import org.elasticsearch.action.admin.indices.create.CreateIndexRequest; import org.elasticsearch.action.admin.indices.create.CreateIndexRequestBuilder; @@ -1355,12 +1356,12 @@ public DeleteIndexRequestBuilder prepareDelete(String... indices) { } @Override - public ActionFuture close(final CloseIndexRequest request) { + public ActionFuture close(final CloseIndexRequest request) { return execute(CloseIndexAction.INSTANCE, request); } @Override - public void close(final CloseIndexRequest request, final ActionListener listener) { + public void close(final CloseIndexRequest request, final ActionListener listener) { execute(CloseIndexAction.INSTANCE, request, listener); } diff --git a/server/src/main/java/org/elasticsearch/cluster/metadata/MetaDataIndexStateService.java b/server/src/main/java/org/elasticsearch/cluster/metadata/MetaDataIndexStateService.java index 2dba9b695ac11..eb6bca67b8d2c 100644 --- a/server/src/main/java/org/elasticsearch/cluster/metadata/MetaDataIndexStateService.java +++ b/server/src/main/java/org/elasticsearch/cluster/metadata/MetaDataIndexStateService.java @@ -28,6 +28,7 @@ import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.NotifyOnceListener; import org.elasticsearch.action.admin.indices.close.CloseIndexClusterStateUpdateRequest; +import org.elasticsearch.action.admin.indices.close.CloseIndexResponse; import org.elasticsearch.action.admin.indices.close.TransportVerifyShardBeforeCloseAction; import org.elasticsearch.action.admin.indices.open.OpenIndexClusterStateUpdateRequest; import org.elasticsearch.action.support.ActiveShardsObserver; @@ -119,7 +120,7 @@ public MetaDataIndexStateService(ClusterService clusterService, AllocationServic * Closing indices is a 3 steps process: it first adds a write block to every indices to close, then waits for the operations on shards * to be terminated and finally closes the indices by moving their state to CLOSE. */ - public void closeIndices(final CloseIndexClusterStateUpdateRequest request, final ActionListener listener) { + public void closeIndices(final CloseIndexClusterStateUpdateRequest request, final ActionListener listener) { final Index[] concreteIndices = request.indices(); if (concreteIndices == null || concreteIndices.length == 0) { throw new IllegalArgumentException("Index name is required"); @@ -139,7 +140,7 @@ public ClusterState execute(final ClusterState currentState) { public void clusterStateProcessed(final String source, final ClusterState oldState, final ClusterState newState) { if (oldState == newState) { assert blockedIndices.isEmpty() : "List of blocked indices is not empty but cluster state wasn't changed"; - listener.onResponse(new AcknowledgedResponse(true)); + listener.onResponse(new CloseIndexResponse(true, false)); } else { assert blockedIndices.isEmpty() == false : "List of blocked indices is empty but cluster state was changed"; threadPool.executor(ThreadPool.Names.MANAGEMENT) @@ -171,7 +172,6 @@ public void onFailure(final String source, final Exception e) { public void clusterStateProcessed(final String source, final ClusterState oldState, final ClusterState newState) { - final AcknowledgedResponse response = new AcknowledgedResponse(acknowledged); final String[] indices = results.entrySet().stream() .filter(result -> result.getValue().isAcknowledged()) .map(result -> result.getKey().getName()) @@ -185,10 +185,14 @@ public void clusterStateProcessed(final String source, logger.debug("[{}] indices closed, but the operation timed out while waiting " + "for enough shards to be started.", Arrays.toString(indices)); } - listener.onResponse(response); + // acknowledged maybe be false but some indices may have been correctly closed, so + // we maintain a kind of coherency by overriding the shardsAcknowledged value + // (see ShardsAcknowledgedResponse constructor) + boolean shardsAcked = acknowledged ? shardsAcknowledged : false; + listener.onResponse(new CloseIndexResponse(acknowledged, shardsAcked)); }, listener::onFailure); } else { - listener.onResponse(response); + listener.onResponse(new CloseIndexResponse(acknowledged, false)); } } }), diff --git a/server/src/test/java/org/elasticsearch/action/admin/indices/close/CloseIndexResponseTests.java b/server/src/test/java/org/elasticsearch/action/admin/indices/close/CloseIndexResponseTests.java new file mode 100644 index 0000000000000..dc859cfab63a9 --- /dev/null +++ b/server/src/test/java/org/elasticsearch/action/admin/indices/close/CloseIndexResponseTests.java @@ -0,0 +1,86 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.action.admin.indices.close; + +import org.elasticsearch.Version; +import org.elasticsearch.action.support.master.AcknowledgedResponse; +import org.elasticsearch.common.io.stream.BytesStreamOutput; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.test.VersionUtils; + +import static org.elasticsearch.test.VersionUtils.randomVersionBetween; +import static org.hamcrest.Matchers.equalTo; + +public class CloseIndexResponseTests extends ESTestCase { + + public void testSerialization() throws Exception { + final CloseIndexResponse response = randomResponse(); + try (BytesStreamOutput out = new BytesStreamOutput()) { + response.writeTo(out); + + final CloseIndexResponse deserializedResponse = new CloseIndexResponse(); + try (StreamInput in = out.bytes().streamInput()) { + deserializedResponse.readFrom(in); + } + assertCloseIndexResponse(deserializedResponse, response); + } + } + + public void testBwcSerialization() throws Exception { + { + final CloseIndexResponse response = randomResponse(); + try (BytesStreamOutput out = new BytesStreamOutput()) { + out.setVersion(randomVersionBetween(random(), Version.V_6_0_0, VersionUtils.getPreviousVersion(Version.V_8_0_0))); + response.writeTo(out); + + final AcknowledgedResponse deserializedResponse = new AcknowledgedResponse(); + try (StreamInput in = out.bytes().streamInput()) { + deserializedResponse.readFrom(in); + } + assertThat(deserializedResponse.isAcknowledged(), equalTo(response.isAcknowledged())); + } + } + { + final AcknowledgedResponse response = new AcknowledgedResponse(randomBoolean()); + try (BytesStreamOutput out = new BytesStreamOutput()) { + response.writeTo(out); + + final CloseIndexResponse deserializedResponse = new CloseIndexResponse(); + try (StreamInput in = out.bytes().streamInput()) { + in.setVersion(randomVersionBetween(random(), Version.V_6_0_0, VersionUtils.getPreviousVersion(Version.V_8_0_0))); + deserializedResponse.readFrom(in); + } + assertThat(deserializedResponse.isAcknowledged(), equalTo(response.isAcknowledged())); + } + } + } + + private CloseIndexResponse randomResponse() { + final boolean acknowledged = randomBoolean(); + final boolean shardsAcknowledged = acknowledged ? randomBoolean() : false; + return new CloseIndexResponse(acknowledged, shardsAcknowledged); + } + + private static void assertCloseIndexResponse(final CloseIndexResponse actual, final CloseIndexResponse expected) { + assertThat(actual.isAcknowledged(), equalTo(expected.isAcknowledged())); + assertThat(actual.isShardsAcknowledged(), equalTo(expected.isShardsAcknowledged())); + } +} diff --git a/server/src/test/java/org/elasticsearch/indices/state/CloseIndexIT.java b/server/src/test/java/org/elasticsearch/indices/state/CloseIndexIT.java index 6da7c93d28895..ffb5615cfdca2 100644 --- a/server/src/test/java/org/elasticsearch/indices/state/CloseIndexIT.java +++ b/server/src/test/java/org/elasticsearch/indices/state/CloseIndexIT.java @@ -20,9 +20,11 @@ import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.action.ActionRequestValidationException; +import org.elasticsearch.action.admin.indices.close.CloseIndexResponse; import org.elasticsearch.action.support.ActiveShardCount; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.block.ClusterBlockException; +import org.elasticsearch.cluster.health.ClusterHealthStatus; import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.metadata.MetaDataIndexStateService; import org.elasticsearch.cluster.routing.ShardRouting; @@ -306,6 +308,25 @@ public void testConcurrentClosesAndOpens() throws Exception { indexer.totalIndexedDocs()); } + public void testCloseIndexWaitForActiveShards() throws Exception { + final String indexName = randomAlphaOfLength(10).toLowerCase(Locale.ROOT); + createIndex(indexName, Settings.builder() + .put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 2) + .put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 0) // no replicas to avoid recoveries that could fail the index closing + .build()); + + final int nbDocs = randomIntBetween(0, 50); + indexRandom(randomBoolean(), false, randomBoolean(), IntStream.range(0, nbDocs) + .mapToObj(i -> client().prepareIndex(indexName, "_doc", String.valueOf(i)).setSource("num", i)).collect(toList())); + ensureGreen(indexName); + + final CloseIndexResponse closeIndexResponse = client().admin().indices().prepareClose(indexName).get(); + assertThat(client().admin().cluster().prepareHealth(indexName).get().getStatus(), is(ClusterHealthStatus.GREEN)); + assertTrue(closeIndexResponse.isAcknowledged()); + assertTrue(closeIndexResponse.isShardsAcknowledged()); + assertIndexIsClosed(indexName); + } + static void assertIndexIsClosed(final String... indices) { final ClusterState clusterState = client().admin().cluster().prepareState().get().getState(); for (String index : indices) { diff --git a/server/src/test/java/org/elasticsearch/indices/state/ReopenWhileClosingIT.java b/server/src/test/java/org/elasticsearch/indices/state/ReopenWhileClosingIT.java index 083c5ab1f5510..25d8f07bbd1cd 100644 --- a/server/src/test/java/org/elasticsearch/indices/state/ReopenWhileClosingIT.java +++ b/server/src/test/java/org/elasticsearch/indices/state/ReopenWhileClosingIT.java @@ -20,8 +20,8 @@ package org.elasticsearch.indices.state; import org.elasticsearch.action.ActionFuture; +import org.elasticsearch.action.admin.indices.close.CloseIndexResponse; import org.elasticsearch.action.admin.indices.close.TransportVerifyShardBeforeCloseAction; -import org.elasticsearch.action.support.master.AcknowledgedResponse; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.node.DiscoveryNode; @@ -72,7 +72,7 @@ public void testReopenDuringClose() throws Exception { final CountDownLatch block = new CountDownLatch(1); final Releasable releaseBlock = interceptVerifyShardBeforeCloseActions(indexName, block::countDown); - ActionFuture closeIndexResponse = client().admin().indices().prepareClose(indexName).execute(); + ActionFuture closeIndexResponse = client().admin().indices().prepareClose(indexName).execute(); assertTrue("Waiting for index to have a closing blocked", block.await(60, TimeUnit.SECONDS)); assertIndexIsBlocked(indexName); assertFalse(closeIndexResponse.isDone()); @@ -96,7 +96,7 @@ public void testReopenDuringCloseOnMultipleIndices() throws Exception { final CountDownLatch block = new CountDownLatch(1); final Releasable releaseBlock = interceptVerifyShardBeforeCloseActions(randomFrom(indices), block::countDown); - ActionFuture closeIndexResponse = client().admin().indices().prepareClose("index-*").execute(); + ActionFuture closeIndexResponse = client().admin().indices().prepareClose("index-*").execute(); assertTrue("Waiting for index to have a closing blocked", block.await(60, TimeUnit.SECONDS)); assertFalse(closeIndexResponse.isDone()); indices.forEach(ReopenWhileClosingIT::assertIndexIsBlocked); diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowTasksExecutor.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowTasksExecutor.java index d9af0d6e71c04..01b9a20c30c1a 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowTasksExecutor.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowTasksExecutor.java @@ -13,6 +13,7 @@ import org.elasticsearch.action.admin.cluster.state.ClusterStateRequest; import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse; import org.elasticsearch.action.admin.indices.close.CloseIndexRequest; +import org.elasticsearch.action.admin.indices.close.CloseIndexResponse; import org.elasticsearch.action.admin.indices.mapping.put.PutMappingRequest; import org.elasticsearch.action.admin.indices.open.OpenIndexRequest; import org.elasticsearch.action.admin.indices.open.OpenIndexResponse; @@ -183,7 +184,7 @@ private void closeIndexUpdateSettingsAndOpenIndex(String followIndex, Runnable handler, Consumer onFailure) { CloseIndexRequest closeRequest = new CloseIndexRequest(followIndex); - CheckedConsumer onResponse = response -> { + CheckedConsumer onResponse = response -> { updateSettingsAndOpenIndex(followIndex, updatedSettings, handler, onFailure); }; followerClient.admin().indices().close(closeRequest, ActionListener.wrap(onResponse, onFailure)); diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/action/TransportFreezeIndexAction.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/action/TransportFreezeIndexAction.java index 1efe5389d81b2..91b91ddd04f3c 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/action/TransportFreezeIndexAction.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/action/TransportFreezeIndexAction.java @@ -12,6 +12,7 @@ import org.elasticsearch.action.ActionRequestValidationException; import org.elasticsearch.action.IndicesRequest; import org.elasticsearch.action.admin.indices.close.CloseIndexClusterStateUpdateRequest; +import org.elasticsearch.action.admin.indices.close.CloseIndexResponse; import org.elasticsearch.action.admin.indices.close.TransportCloseIndexAction; import org.elasticsearch.action.admin.indices.open.OpenIndexClusterStateUpdateRequest; import org.elasticsearch.action.admin.indices.open.OpenIndexResponse; @@ -126,9 +127,9 @@ protected void masterOperation(Task task, TransportFreezeIndexAction.FreezeReque .masterNodeTimeout(request.masterNodeTimeout()) .indices(concreteIndices); - indexStateService.closeIndices(closeRequest, new ActionListener() { + indexStateService.closeIndices(closeRequest, new ActionListener() { @Override - public void onResponse(final AcknowledgedResponse response) { + public void onResponse(final CloseIndexResponse response) { if (response.isAcknowledged()) { toggleFrozenSettings(concreteIndices, request, listener); } else { From 9dbd7ac15d1cd478541672c1e712cdb601b82223 Mon Sep 17 00:00:00 2001 From: Tanguy Leroux Date: Fri, 15 Feb 2019 14:10:40 +0100 Subject: [PATCH 3/6] Revert change in cat.indices --- .../test/cat.indices/10_basic.yml | 75 +------------------ 1 file changed, 1 insertion(+), 74 deletions(-) diff --git a/rest-api-spec/src/main/resources/rest-api-spec/test/cat.indices/10_basic.yml b/rest-api-spec/src/main/resources/rest-api-spec/test/cat.indices/10_basic.yml index 7bb503d976d11..c7eddf42d1b03 100644 --- a/rest-api-spec/src/main/resources/rest-api-spec/test/cat.indices/10_basic.yml +++ b/rest-api-spec/src/main/resources/rest-api-spec/test/cat.indices/10_basic.yml @@ -1,5 +1,5 @@ --- -"Test cat indices output (no indices)": +"Test cat indices output": - do: cat.indices: {} @@ -8,9 +8,6 @@ $body: | /^$/ ---- -"Test cat indices output": - - do: indices.create: index: index1 @@ -50,26 +47,9 @@ (\d\d\d\d\-\d\d\-\d\dT\d\d:\d\d:\d\d.\d\d\dZ) \s* ) $/ - ---- -"Test cat indices output for closed index (pre 8.0.0)": - - - skip: - version: "8.0.0 - " - reason: "closed indices are replicated starting version 8.0" - - - do: - indices.create: - index: index1 - body: - settings: - number_of_shards: "1" - number_of_replicas: "0" - - do: indices.close: index: index1 - - is_true: acknowledged - do: cat.indices: @@ -90,59 +70,6 @@ ) $/ ---- -"Test cat indices output for closed index": - - - skip: - version: " - 7.99.99" - reason: "closed indices are replicated starting version 8.0" - - - do: - indices.create: - index: index1 - body: - settings: - number_of_shards: "1" - number_of_replicas: "0" - - - do: - bulk: - body: - - '{"index": {"_index": "index1"}}' - - '{"f1": "1"}' - - '{"index": {"_index": "index1"}}' - - '{"f1": "2"}' - - '{"index": {"_index": "index1"}}' - - '{"f1": "3"}' - - '{"index": {"_index": "index1"}}' - - '{"f1": "4"}' - - '{"index": {"_index": "index1"}}' - - '{"f1": "5"}' - - - do: - indices.close: - index: index1 - - is_true: acknowledged - - - do: - cat.indices: - index: index* - - - match: - $body: | - /^( \s+ - close \s+ - index1 \s+ - ([a-zA-Z0-9=/_+]|[\\\-]){22} \s+ - \s+ - \s+ - 5 \s+ - 0 \s+ - (\d+|\d+[.]\d+)(kb|b) \s+ - (\d+|\d+[.]\d+)(kb|b) \s* - ) - $/ - --- "Test cat indices using health status": From 75479f6016d1f5b293df4558513cd1de6706e42c Mon Sep 17 00:00:00 2001 From: Tanguy Leroux Date: Fri, 15 Feb 2019 15:49:57 +0100 Subject: [PATCH 4/6] Adapt cat.indices YAML test to pass on replicated/non replicated clusters --- .../resources/rest-api-spec/test/cat.indices/10_basic.yml | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/rest-api-spec/src/main/resources/rest-api-spec/test/cat.indices/10_basic.yml b/rest-api-spec/src/main/resources/rest-api-spec/test/cat.indices/10_basic.yml index c7eddf42d1b03..1a0c31c3362e0 100644 --- a/rest-api-spec/src/main/resources/rest-api-spec/test/cat.indices/10_basic.yml +++ b/rest-api-spec/src/main/resources/rest-api-spec/test/cat.indices/10_basic.yml @@ -63,10 +63,10 @@ ([a-zA-Z0-9=/_+]|[\\\-]){22} \s+ \s+ \s+ - \s+ - \s+ - \s+ - \s* + 0? \s+ # replicated closed indices report docs stats + 0? \s+ # replicated closed indices report docs stats + ((\d+|\d+[.]\d+)(kb|b))? \s+ # replicated closed indices can report store stats + ((\d+|\d+[.]\d+)(kb|b))? \s* # replicated closed indices can report store stats ) $/ From 109a3bd3abc2544576f690259c78ffbf669904a7 Mon Sep 17 00:00:00 2001 From: Tanguy Leroux Date: Mon, 18 Feb 2019 14:37:56 +0100 Subject: [PATCH 5/6] Adapt CloseFollowerIndexStepTests --- .../resources/rest-api-spec/test/indices.open/10_basic.yml | 1 + .../core/indexlifecycle/CloseFollowerIndexStepTests.java | 6 +++--- 2 files changed, 4 insertions(+), 3 deletions(-) diff --git a/rest-api-spec/src/main/resources/rest-api-spec/test/indices.open/10_basic.yml b/rest-api-spec/src/main/resources/rest-api-spec/test/indices.open/10_basic.yml index fcb49acfddf1d..a389fee9bf761 100644 --- a/rest-api-spec/src/main/resources/rest-api-spec/test/indices.open/10_basic.yml +++ b/rest-api-spec/src/main/resources/rest-api-spec/test/indices.open/10_basic.yml @@ -81,3 +81,4 @@ wait_for_active_shards: all - is_true: acknowledged - match: { acknowledged: true } + - match: { shards_acknowledged: true } diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/indexlifecycle/CloseFollowerIndexStepTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/indexlifecycle/CloseFollowerIndexStepTests.java index 25e1c4e481bba..368afaa26d0cc 100644 --- a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/indexlifecycle/CloseFollowerIndexStepTests.java +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/indexlifecycle/CloseFollowerIndexStepTests.java @@ -8,7 +8,7 @@ import org.elasticsearch.Version; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.admin.indices.close.CloseIndexRequest; -import org.elasticsearch.action.support.master.AcknowledgedResponse; +import org.elasticsearch.action.admin.indices.close.CloseIndexResponse; import org.elasticsearch.client.AdminClient; import org.elasticsearch.client.Client; import org.elasticsearch.client.IndicesAdminClient; @@ -43,8 +43,8 @@ public void testCloseFollowingIndex() { CloseIndexRequest closeIndexRequest = (CloseIndexRequest) invocation.getArguments()[0]; assertThat(closeIndexRequest.indices()[0], equalTo("follower-index")); @SuppressWarnings("unchecked") - ActionListener listener = (ActionListener) invocation.getArguments()[1]; - listener.onResponse(new AcknowledgedResponse(true)); + ActionListener listener = (ActionListener) invocation.getArguments()[1]; + listener.onResponse(new CloseIndexResponse(true, true)); return null; }).when(indicesClient).close(Mockito.any(), Mockito.any()); From e559468f1ca8815b405c389f43c5637bfd7b6c80 Mon Sep 17 00:00:00 2001 From: Tanguy Leroux Date: Tue, 26 Feb 2019 16:12:22 +0100 Subject: [PATCH 6/6] Fix CloseFollowerIndexIT --- .../org/elasticsearch/xpack/ccr/CloseFollowerIndexIT.java | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/CloseFollowerIndexIT.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/CloseFollowerIndexIT.java index 2f111727b08ee..7f93934fd91f8 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/CloseFollowerIndexIT.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/CloseFollowerIndexIT.java @@ -26,6 +26,7 @@ import java.security.AccessController; import java.security.PrivilegedAction; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import static java.util.Collections.singletonMap; @@ -96,7 +97,10 @@ public void testCloseAndReopenFollowerIndex() throws Exception { } atLeastDocsIndexed(followerClient(), "index2", 32); - AcknowledgedResponse response = followerClient().admin().indices().close(new CloseIndexRequest("index2")).get(); + + CloseIndexRequest closeIndexRequest = new CloseIndexRequest("index2"); + closeIndexRequest.waitForActiveShards(ActiveShardCount.NONE); + AcknowledgedResponse response = followerClient().admin().indices().close(closeIndexRequest).get(); assertThat(response.isAcknowledged(), is(true)); ClusterState clusterState = followerClient().admin().cluster().prepareState().get().getState(); @@ -126,6 +130,6 @@ public void testCloseAndReopenFollowerIndex() throws Exception { followerSearchRequest.source().trackTotalHits(true); long followerIndexDocs = followerClient().search(followerSearchRequest).actionGet().getHits().getTotalHits().value; assertThat(followerIndexDocs, equalTo(leaderIndexDocs)); - }); + }, 30L, TimeUnit.SECONDS); } }