Skip to content

Wait for shards to be active after closing indices #38854

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,10 @@
"options" : ["open","closed","none","all"],
"default" : "open",
"description" : "Whether to expand wildcard expression to concrete indices that are open, closed or both."
},
"wait_for_active_shards": {
"type" : "string",
"description" : "Sets the number of active shards to wait for before the operation returns."
}
}
},
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
---
"Test cat indices output":
"Test cat indices output (no indices)":

- do:
cat.indices: {}
Expand All @@ -8,6 +8,9 @@
$body: |
/^$/

---
"Test cat indices output":

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This test is split because replicated closed indices can report docs stats while non-replicated closes indices can't

- do:
indices.create:
index: index1
Expand Down Expand Up @@ -47,9 +50,26 @@
(\d\d\d\d\-\d\d\-\d\dT\d\d:\d\d:\d\d.\d\d\dZ) \s*
)
$/

---
"Test cat indices output for closed index (pre 8.0.0)":

- skip:
version: "8.0.0 - "
reason: "closed indices are replicated starting version 8.0"

- do:
indices.create:
index: index1
body:
settings:
number_of_shards: "1"
number_of_replicas: "0"

- do:
indices.close:
index: index1
- is_true: acknowledged

- do:
cat.indices:
Expand All @@ -70,6 +90,59 @@
)
$/

---
"Test cat indices output for closed index":

- skip:
version: " - 7.99.99"
reason: "closed indices are replicated starting version 8.0"

- do:
indices.create:
index: index1
body:
settings:
number_of_shards: "1"
number_of_replicas: "0"

- do:
bulk:
body:
- '{"index": {"_index": "index1"}}'
- '{"f1": "1"}'
- '{"index": {"_index": "index1"}}'
- '{"f1": "2"}'
- '{"index": {"_index": "index1"}}'
- '{"f1": "3"}'
- '{"index": {"_index": "index1"}}'
- '{"f1": "4"}'
- '{"index": {"_index": "index1"}}'
- '{"f1": "5"}'

- do:
indices.close:
index: index1
- is_true: acknowledged

- do:
cat.indices:
index: index*

- match:
$body: |
/^( \s+
close \s+
index1 \s+
([a-zA-Z0-9=/_+]|[\\\-]){22} \s+
\s+
\s+
5 \s+
0 \s+
(\d+|\d+[.]\d+)(kb|b) \s+
(\d+|\d+[.]\d+)(kb|b) \s*
)
$/

---
"Test cat indices using health status":

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
- do:
indices.close:
index: test_index
- is_true: acknowledged

- do:
catch: bad_request
Expand All @@ -24,6 +25,7 @@
- do:
indices.open:
index: test_index
- is_true: acknowledged

- do:
cluster.health:
Expand All @@ -50,11 +52,32 @@
- do:
indices.close:
index: test_index
- is_true: acknowledged

- do:
indices.open:
index: test_index
wait_for_active_shards: all

- is_true: acknowledged
- match: { acknowledged: true }
- match: { shards_acknowledged: true }

---
"Close index with wait_for_active_shards set to all":
- skip:
version: " - 7.99.99"
reason: "closed indices are replicated starting version 8.0"

- do:
indices.create:
index: test_index
body:
settings:
number_of_replicas: 0

- do:
indices.close:
index: test_index
wait_for_active_shards: all
- is_true: acknowledged
- match: { acknowledged: true }
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ setup:
- do:
indices.close:
index: _all
- is_true: acknowledged

- do:
catch: bad_request
Expand All @@ -36,6 +37,7 @@ setup:
- do:
indices.open:
index: _all
- is_true: acknowledged

- do:
cluster.health:
Expand All @@ -51,6 +53,7 @@ setup:
- do:
indices.close:
index: test_*
- is_true: acknowledged

- do:
catch: bad_request
Expand All @@ -61,6 +64,7 @@ setup:
- do:
indices.open:
index: test_*
- is_true: acknowledged

- do:
cluster.health:
Expand All @@ -76,6 +80,7 @@ setup:
- do:
indices.close:
index: '*'
- is_true: acknowledged

- do:
catch: bad_request
Expand All @@ -86,6 +91,7 @@ setup:
- do:
indices.open:
index: '*'
- is_true: acknowledged

- do:
cluster.health:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,16 @@
*/
package org.elasticsearch.action.admin.indices.close;

import org.elasticsearch.action.support.ActiveShardCount;
import org.elasticsearch.cluster.ack.IndicesClusterStateUpdateRequest;

/**
* Cluster state update request that allows to close one or more indices
*/
public class CloseIndexClusterStateUpdateRequest extends IndicesClusterStateUpdateRequest<CloseIndexClusterStateUpdateRequest> {

private final long taskId;
private long taskId;
private ActiveShardCount waitForActiveShards = ActiveShardCount.DEFAULT;

public CloseIndexClusterStateUpdateRequest(final long taskId) {
this.taskId = taskId;
Expand All @@ -34,4 +36,18 @@ public CloseIndexClusterStateUpdateRequest(final long taskId) {
public long taskId() {
return taskId;
}

public CloseIndexClusterStateUpdateRequest taskId(final long taskId) {
this.taskId = taskId;
return this;
}

public ActiveShardCount waitForActiveShards() {
return waitForActiveShards;
}

public CloseIndexClusterStateUpdateRequest waitForActiveShards(final ActiveShardCount waitForActiveShards) {
this.waitForActiveShards = waitForActiveShards;
return this;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,10 @@

package org.elasticsearch.action.admin.indices.close;

import org.elasticsearch.Version;
import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.action.IndicesRequest;
import org.elasticsearch.action.support.ActiveShardCount;
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.action.support.master.AcknowledgedRequest;
import org.elasticsearch.common.io.stream.StreamInput;
Expand All @@ -38,6 +40,7 @@ public class CloseIndexRequest extends AcknowledgedRequest<CloseIndexRequest> im

private String[] indices;
private IndicesOptions indicesOptions = IndicesOptions.strictExpandOpen();
private ActiveShardCount waitForActiveShards = ActiveShardCount.DEFAULT;

public CloseIndexRequest() {
}
Expand Down Expand Up @@ -101,17 +104,34 @@ public CloseIndexRequest indicesOptions(IndicesOptions indicesOptions) {
return this;
}

public ActiveShardCount waitForActiveShards() {
return waitForActiveShards;
}

public CloseIndexRequest waitForActiveShards(final ActiveShardCount waitForActiveShards) {
this.waitForActiveShards = waitForActiveShards;
return this;
}

@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
indices = in.readStringArray();
indicesOptions = IndicesOptions.readIndicesOptions(in);
if (in.getVersion().onOrAfter(Version.V_8_0_0)) {
waitForActiveShards = ActiveShardCount.readFrom(in);
} else {
waitForActiveShards = ActiveShardCount.NONE;
}
}

@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeStringArray(indices);
indicesOptions.writeIndicesOptions(out);
if (out.getVersion().onOrAfter(Version.V_8_0_0)) {
waitForActiveShards.writeTo(out);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

package org.elasticsearch.action.admin.indices.close;

import org.elasticsearch.action.support.ActiveShardCount;
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.action.support.master.AcknowledgedRequestBuilder;
import org.elasticsearch.action.support.master.AcknowledgedResponse;
Expand Down Expand Up @@ -60,4 +61,31 @@ public CloseIndexRequestBuilder setIndicesOptions(IndicesOptions indicesOptions)
request.indicesOptions(indicesOptions);
return this;
}

/**
* Sets the number of shard copies that should be active for indices closing to return.
* Defaults to {@link ActiveShardCount#DEFAULT}, which will wait for one shard copy
* (the primary) to become active. Set this value to {@link ActiveShardCount#ALL} to
* wait for all shards (primary and all replicas) to be active before returning.
* Otherwise, use {@link ActiveShardCount#from(int)} to set this value to any
* non-negative integer, up to the number of copies per shard (number of replicas + 1),
* to wait for the desired amount of shard copies to become active before returning.
* Indices closing will only wait up until the timeout value for the number of shard copies
* to be active before returning.
*
* @param waitForActiveShards number of active shard copies to wait on
*/
public CloseIndexRequestBuilder setWaitForActiveShards(final ActiveShardCount waitForActiveShards) {
request.waitForActiveShards(waitForActiveShards);
return this;
}

/**
* A shortcut for {@link #setWaitForActiveShards(ActiveShardCount)} where the numerical
* shard count is passed in, instead of having to first call {@link ActiveShardCount#from(int)}
* to get the ActiveShardCount.
*/
public CloseIndexRequestBuilder setWaitForActiveShards(final int waitForActiveShards) {
return setWaitForActiveShards(ActiveShardCount.from(waitForActiveShards));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,7 @@ protected void masterOperation(final Task task, final CloseIndexRequest request,
final CloseIndexClusterStateUpdateRequest closeRequest = new CloseIndexClusterStateUpdateRequest(task.getId())
.ackTimeout(request.timeout())
.masterNodeTimeout(request.masterNodeTimeout())
.waitForActiveShards(request.waitForActiveShards())
.indices(concreteIndices);

indexStateService.closeIndices(closeRequest, new ActionListener<AcknowledgedResponse>() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,26 @@ public void onFailure(final String source, final Exception e) {
@Override
public void clusterStateProcessed(final String source,
final ClusterState oldState, final ClusterState newState) {
listener.onResponse(new AcknowledgedResponse(acknowledged));

final AcknowledgedResponse response = new AcknowledgedResponse(acknowledged);
final String[] indices = results.entrySet().stream()
.filter(result -> result.getValue().isAcknowledged())
.map(result -> result.getKey().getName())
.filter(index -> newState.routingTable().hasIndex(index))
.toArray(String[]::new);

if (indices.length > 0) {
activeShardsObserver.waitForActiveShards(indices, request.waitForActiveShards(),
request.ackTimeout(), shardsAcknowledged -> {
if (shardsAcknowledged == false) {
logger.debug("[{}] indices closed, but the operation timed out while waiting " +
"for enough shards to be started.", Arrays.toString(indices));
}
listener.onResponse(response);
}, listener::onFailure);
} else {
listener.onResponse(response);
}
}
}),
listener::onFailure)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
package org.elasticsearch.rest.action.admin.indices;

import org.elasticsearch.action.admin.indices.close.CloseIndexRequest;
import org.elasticsearch.action.support.ActiveShardCount;
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.client.node.NodeClient;
import org.elasticsearch.common.Strings;
Expand Down Expand Up @@ -49,6 +50,10 @@ public RestChannelConsumer prepareRequest(final RestRequest request, final NodeC
closeIndexRequest.masterNodeTimeout(request.paramAsTime("master_timeout", closeIndexRequest.masterNodeTimeout()));
closeIndexRequest.timeout(request.paramAsTime("timeout", closeIndexRequest.timeout()));
closeIndexRequest.indicesOptions(IndicesOptions.fromRequest(request, closeIndexRequest.indicesOptions()));
String waitForActiveShards = request.param("wait_for_active_shards");
if (waitForActiveShards != null) {
closeIndexRequest.waitForActiveShards(ActiveShardCount.parseString(waitForActiveShards));
}
return channel -> client.admin().indices().close(closeIndexRequest, new RestToXContentListener<>(channel));
}

Expand Down
Loading