Skip to content

Commit 2ee14e3

Browse files
committed
Allow index patterns
1 parent e3fc87a commit 2ee14e3

File tree

3 files changed

+67
-13
lines changed

3 files changed

+67
-13
lines changed

x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/CcrStatsAction.java

Lines changed: 35 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,9 @@
99
import org.elasticsearch.action.Action;
1010
import org.elasticsearch.action.ActionRequestValidationException;
1111
import org.elasticsearch.action.FailedNodeException;
12+
import org.elasticsearch.action.IndicesRequest;
1213
import org.elasticsearch.action.TaskOperationFailure;
14+
import org.elasticsearch.action.support.IndicesOptions;
1315
import org.elasticsearch.action.support.tasks.BaseTasksRequest;
1416
import org.elasticsearch.action.support.tasks.BaseTasksResponse;
1517
import org.elasticsearch.common.io.stream.StreamInput;
@@ -83,21 +85,41 @@ public XContentBuilder toXContent(final XContentBuilder builder, final Params pa
8385
}
8486
}
8587

86-
public static class TasksRequest extends BaseTasksRequest<TasksRequest> {
88+
public static class TasksRequest extends BaseTasksRequest<TasksRequest> implements IndicesRequest {
8789

88-
private String indexName;
90+
private String[] indices;
8991

90-
public void setIndexName(final String indexName) {
91-
this.indexName = indexName;
92+
@Override
93+
public String[] indices() {
94+
return indices;
95+
}
96+
97+
public void setIndices(final String[] indices) {
98+
this.indices = indices;
99+
}
100+
101+
private IndicesOptions indicesOptions = IndicesOptions.strictExpandOpenAndForbidClosed();
102+
103+
@Override
104+
public IndicesOptions indicesOptions() {
105+
return indicesOptions;
106+
}
107+
108+
public void setIndicesOptions(final IndicesOptions indicesOptions) {
109+
this.indicesOptions = indicesOptions;
92110
}
93111

94112
@Override
95113
public boolean match(final Task task) {
96-
if (task instanceof ShardFollowNodeTask) {
97-
final ShardFollowNodeTask shardTask = (ShardFollowNodeTask) task;
98-
return indexName.equals("") || indexName.equals("_all") || shardTask.getFollowShardId().getIndexName().equals(indexName);
99-
}
100-
return false;
114+
/*
115+
* This is a limitation of the current tasks API. When the transport action is executed, the tasks API invokes this match method
116+
* to find the tasks on which to execute the task-level operation (see TransportTasksAction#nodeOperation and
117+
* TransportTasksAction#taskOperation). If we do the matching here, then we can not match index patterns. Therefore, we defer
118+
* deciding whether or not the task matches the request to the transport action (see TransportCcrStatsAction#taskOperation)
119+
* where we can decide on the basis of the cluster state whether or not the task matches any of the index patterns in the
120+
* request.
121+
*/
122+
return task instanceof ShardFollowNodeTask;
101123
}
102124

103125
@Override
@@ -108,13 +130,15 @@ public ActionRequestValidationException validate() {
108130
@Override
109131
public void readFrom(final StreamInput in) throws IOException {
110132
super.readFrom(in);
111-
indexName = in.readOptionalString();
133+
indices = in.readStringArray();
134+
indicesOptions = IndicesOptions.readIndicesOptions(in);
112135
}
113136

114137
@Override
115138
public void writeTo(StreamOutput out) throws IOException {
116139
super.writeTo(out);
117-
out.writeOptionalString(indexName);
140+
out.writeStringArray(indices);
141+
indicesOptions.writeIndicesOptions(out);
118142
}
119143

120144
}

x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/TransportCcrStatsAction.java

Lines changed: 28 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,27 +11,39 @@
1111
import org.elasticsearch.action.TaskOperationFailure;
1212
import org.elasticsearch.action.support.ActionFilters;
1313
import org.elasticsearch.action.support.tasks.TransportTasksAction;
14+
import org.elasticsearch.cluster.ClusterState;
15+
import org.elasticsearch.cluster.block.ClusterBlockException;
16+
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
1417
import org.elasticsearch.cluster.service.ClusterService;
1518
import org.elasticsearch.common.inject.Inject;
1619
import org.elasticsearch.common.io.stream.StreamInput;
1720
import org.elasticsearch.common.settings.Settings;
21+
import org.elasticsearch.indices.cluster.IndicesClusterStateService;
22+
import org.elasticsearch.tasks.Task;
1823
import org.elasticsearch.transport.TransportService;
1924
import org.elasticsearch.xpack.ccr.Ccr;
2025

2126
import java.io.IOException;
27+
import java.util.Arrays;
28+
import java.util.HashSet;
2229
import java.util.List;
30+
import java.util.Set;
31+
import java.util.function.Consumer;
2332

2433
public class TransportCcrStatsAction extends TransportTasksAction<
2534
ShardFollowNodeTask,
2635
CcrStatsAction.TasksRequest,
2736
CcrStatsAction.TasksResponse, CcrStatsAction.TaskResponse> {
2837

38+
private final IndexNameExpressionResolver resolver;
39+
2940
@Inject
3041
public TransportCcrStatsAction(
3142
final Settings settings,
3243
final ClusterService clusterService,
3344
final TransportService transportService,
34-
final ActionFilters actionFilters) {
45+
final ActionFilters actionFilters,
46+
final IndexNameExpressionResolver resolver) {
3547
super(
3648
settings,
3749
CcrStatsAction.NAME,
@@ -41,6 +53,7 @@ public TransportCcrStatsAction(
4153
CcrStatsAction.TasksRequest::new,
4254
CcrStatsAction.TasksResponse::new,
4355
Ccr.CCR_THREAD_POOL_NAME);
56+
this.resolver = resolver;
4457
}
4558

4659
@Override
@@ -57,6 +70,20 @@ protected CcrStatsAction.TaskResponse readTaskResponse(final StreamInput in) thr
5770
return new CcrStatsAction.TaskResponse(in);
5871
}
5972

73+
@Override
74+
protected void processTasks(final CcrStatsAction.TasksRequest request, final Consumer<ShardFollowNodeTask> operation) {
75+
final ClusterState state = clusterService.state();
76+
final Set<String> concreteIndices = new HashSet<>(Arrays.asList(resolver.concreteIndexNames(state, request)));
77+
for (final Task task : taskManager.getTasks().values()) {
78+
if (task instanceof ShardFollowNodeTask) {
79+
final ShardFollowNodeTask shardFollowNodeTask = (ShardFollowNodeTask) task;
80+
if (concreteIndices.contains(shardFollowNodeTask.getFollowShardId().getIndexName())) {
81+
operation.accept(shardFollowNodeTask);
82+
}
83+
}
84+
}
85+
}
86+
6087
@Override
6188
protected void taskOperation(
6289
final CcrStatsAction.TasksRequest request,

x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/rest/RestCcrStatsAction.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,9 @@
66

77
package org.elasticsearch.xpack.ccr.rest;
88

9+
import org.elasticsearch.action.support.IndicesOptions;
910
import org.elasticsearch.client.node.NodeClient;
11+
import org.elasticsearch.common.Strings;
1012
import org.elasticsearch.common.settings.Settings;
1113
import org.elasticsearch.rest.BaseRestHandler;
1214
import org.elasticsearch.rest.RestController;
@@ -32,7 +34,8 @@ public String getName() {
3234
@Override
3335
protected RestChannelConsumer prepareRequest(final RestRequest restRequest, final NodeClient client) throws IOException {
3436
final CcrStatsAction.TasksRequest request = new CcrStatsAction.TasksRequest();
35-
request.setIndexName(restRequest.param("index", "_all"));
37+
request.setIndices(Strings.splitStringByCommaToArray(restRequest.param("index")));
38+
request.setIndicesOptions(IndicesOptions.fromRequest(restRequest, request.indicesOptions()));
3639
return channel -> client.execute(CcrStatsAction.INSTANCE, request, new RestToXContentListener<>(channel));
3740
}
3841

0 commit comments

Comments
 (0)