Skip to content

Commit ad499fc

Browse files
authored
[CCR] added rest specs and simple rest test for follow and unfollow apis (#30123)
[CCR] added rest specs and simple rest test for follow and unfollow apis, also Added an acknowledge field in follow and unfollow api responses. Currently these api return an empty response and fixed bug in unfollow api that didn't cleanup node tasks properly.
1 parent 2c73969 commit ad499fc

File tree

9 files changed

+143
-36
lines changed

9 files changed

+143
-36
lines changed

x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/FollowExistingIndexAction.java

+21-3
Original file line numberDiff line numberDiff line change
@@ -10,10 +10,10 @@
1010
import org.elasticsearch.action.ActionRequest;
1111
import org.elasticsearch.action.ActionRequestBuilder;
1212
import org.elasticsearch.action.ActionRequestValidationException;
13-
import org.elasticsearch.action.ActionResponse;
1413
import org.elasticsearch.action.admin.cluster.state.ClusterStateRequest;
1514
import org.elasticsearch.action.support.ActionFilters;
1615
import org.elasticsearch.action.support.HandledTransportAction;
16+
import org.elasticsearch.action.support.master.AcknowledgedResponse;
1717
import org.elasticsearch.client.Client;
1818
import org.elasticsearch.client.ElasticsearchClient;
1919
import org.elasticsearch.cluster.ClusterState;
@@ -139,8 +139,26 @@ public static class RequestBuilder extends ActionRequestBuilder<Request, Respons
139139
}
140140
}
141141

142-
public static class Response extends ActionResponse {
142+
public static class Response extends AcknowledgedResponse {
143143

144+
Response() {
145+
}
146+
147+
Response(boolean acknowledged) {
148+
super(acknowledged);
149+
}
150+
151+
@Override
152+
public void readFrom(StreamInput in) throws IOException {
153+
super.readFrom(in);
154+
readAcknowledged(in);
155+
}
156+
157+
@Override
158+
public void writeTo(StreamOutput out) throws IOException {
159+
super.writeTo(out);
160+
writeAcknowledged(out);
161+
}
144162
}
145163

146164
public static class TransportAction extends HandledTransportAction<Request, Response> {
@@ -261,7 +279,7 @@ void finalizeResponse() {
261279

262280
if (error == null) {
263281
// include task ids?
264-
handler.onResponse(new Response());
282+
handler.onResponse(new Response(true));
265283
} else {
266284
// TODO: cancel all started tasks
267285
handler.onFailure(error);

x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTask.java

+5
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,11 @@ public class ShardFollowNodeTask extends AllocatedPersistentTask {
2929
super(id, type, action, description, parentTask, headers);
3030
}
3131

32+
@Override
33+
protected void onCancelled() {
34+
markAsCompleted();
35+
}
36+
3237
void updateProcessedGlobalCheckpoint(long processedGlobalCheckpoint) {
3338
this.processedGlobalCheckpoint.set(processedGlobalCheckpoint);
3439
}

x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/UnfollowIndexAction.java

+23-4
Original file line numberDiff line numberDiff line change
@@ -10,10 +10,10 @@
1010
import org.elasticsearch.action.ActionRequest;
1111
import org.elasticsearch.action.ActionRequestBuilder;
1212
import org.elasticsearch.action.ActionRequestValidationException;
13-
import org.elasticsearch.action.ActionResponse;
1413
import org.elasticsearch.action.admin.cluster.state.ClusterStateRequest;
1514
import org.elasticsearch.action.support.ActionFilters;
1615
import org.elasticsearch.action.support.HandledTransportAction;
16+
import org.elasticsearch.action.support.master.AcknowledgedResponse;
1717
import org.elasticsearch.client.Client;
1818
import org.elasticsearch.client.ElasticsearchClient;
1919
import org.elasticsearch.cluster.metadata.IndexMetaData;
@@ -81,13 +81,32 @@ public void writeTo(StreamOutput out) throws IOException {
8181
}
8282
}
8383

84-
public static class Response extends ActionResponse {
84+
public static class Response extends AcknowledgedResponse {
85+
86+
Response(boolean acknowledged) {
87+
super(acknowledged);
88+
}
89+
90+
Response() {
91+
}
92+
93+
@Override
94+
public void readFrom(StreamInput in) throws IOException {
95+
super.readFrom(in);
96+
readAcknowledged(in);
97+
}
98+
99+
@Override
100+
public void writeTo(StreamOutput out) throws IOException {
101+
super.writeTo(out);
102+
writeAcknowledged(out);
103+
}
85104

86105
}
87106

88107
public static class RequestBuilder extends ActionRequestBuilder<Request, Response, RequestBuilder> {
89108

90-
public RequestBuilder(ElasticsearchClient client) {
109+
RequestBuilder(ElasticsearchClient client) {
91110
super(client, INSTANCE, new Request());
92111
}
93112
}
@@ -147,7 +166,7 @@ void finalizeResponse() {
147166

148167
if (error == null) {
149168
// include task ids?
150-
listener.onResponse(new Response());
169+
listener.onResponse(new Response(true));
151170
} else {
152171
// TODO: cancel all started tasks
153172
listener.onFailure(error);

x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/rest/RestFollowExistingIndexAction.java

+3-15
Original file line numberDiff line numberDiff line change
@@ -7,21 +7,16 @@
77

88
import org.elasticsearch.client.node.NodeClient;
99
import org.elasticsearch.common.settings.Settings;
10-
import org.elasticsearch.common.xcontent.XContentBuilder;
1110
import org.elasticsearch.rest.BaseRestHandler;
12-
import org.elasticsearch.rest.BytesRestResponse;
1311
import org.elasticsearch.rest.RestController;
1412
import org.elasticsearch.rest.RestRequest;
15-
import org.elasticsearch.rest.RestResponse;
16-
import org.elasticsearch.rest.RestStatus;
17-
import org.elasticsearch.rest.action.RestBuilderListener;
13+
import org.elasticsearch.rest.action.RestToXContentListener;
1814
import org.elasticsearch.xpack.ccr.action.ShardFollowTask;
1915

2016
import java.io.IOException;
2117

2218
import static org.elasticsearch.xpack.ccr.action.FollowExistingIndexAction.INSTANCE;
2319
import static org.elasticsearch.xpack.ccr.action.FollowExistingIndexAction.Request;
24-
import static org.elasticsearch.xpack.ccr.action.FollowExistingIndexAction.Response;
2520

2621
// TODO: change to confirm with API design
2722
public class RestFollowExistingIndexAction extends BaseRestHandler {
@@ -52,13 +47,6 @@ protected RestChannelConsumer prepareRequest(RestRequest restRequest, NodeClient
5247
long value = Long.valueOf(restRequest.param(ShardFollowTask.PROCESSOR_MAX_TRANSLOG_BYTES_PER_REQUEST.getPreferredName()));
5348
request.setProcessorMaxTranslogBytes(value);
5449
}
55-
return channel -> client.execute(INSTANCE, request, new RestBuilderListener<Response>(channel) {
56-
@Override
57-
public RestResponse buildResponse(Response response, XContentBuilder builder) throws Exception {
58-
return new BytesRestResponse(RestStatus.OK, builder.startObject()
59-
.endObject());
60-
61-
}
62-
});
50+
return channel -> client.execute(INSTANCE, request, new RestToXContentListener<>(channel));
6351
}
64-
}
52+
}

x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/rest/RestUnfollowIndexAction.java

+2-14
Original file line numberDiff line numberDiff line change
@@ -7,20 +7,15 @@
77

88
import org.elasticsearch.client.node.NodeClient;
99
import org.elasticsearch.common.settings.Settings;
10-
import org.elasticsearch.common.xcontent.XContentBuilder;
1110
import org.elasticsearch.rest.BaseRestHandler;
12-
import org.elasticsearch.rest.BytesRestResponse;
1311
import org.elasticsearch.rest.RestController;
1412
import org.elasticsearch.rest.RestRequest;
15-
import org.elasticsearch.rest.RestResponse;
16-
import org.elasticsearch.rest.RestStatus;
17-
import org.elasticsearch.rest.action.RestBuilderListener;
13+
import org.elasticsearch.rest.action.RestToXContentListener;
1814

1915
import java.io.IOException;
2016

2117
import static org.elasticsearch.xpack.ccr.action.UnfollowIndexAction.INSTANCE;
2218
import static org.elasticsearch.xpack.ccr.action.UnfollowIndexAction.Request;
23-
import static org.elasticsearch.xpack.ccr.action.UnfollowIndexAction.Response;
2419

2520
// TODO: change to confirm with API design
2621
public class RestUnfollowIndexAction extends BaseRestHandler {
@@ -40,13 +35,6 @@ public String getName() {
4035
protected RestChannelConsumer prepareRequest(RestRequest restRequest, NodeClient client) throws IOException {
4136
Request request = new Request();
4237
request.setFollowIndex(restRequest.param("follow_index"));
43-
return channel -> client.execute(INSTANCE, request, new RestBuilderListener<Response>(channel) {
44-
@Override
45-
public RestResponse buildResponse(Response response, XContentBuilder builder) throws Exception {
46-
return new BytesRestResponse(RestStatus.OK, builder.startObject()
47-
.endObject());
48-
49-
}
50-
});
38+
return channel -> client.execute(INSTANCE, request, new RestToXContentListener<>(channel));
5139
}
5240
}

x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/ShardChangesIT.java

+12
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
*/
66
package org.elasticsearch.xpack.ccr;
77

8+
import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksAction;
89
import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksRequest;
910
import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksResponse;
1011
import org.elasticsearch.action.admin.indices.stats.ShardStats;
@@ -204,6 +205,17 @@ public void testFollowIndex() throws Exception {
204205
final ClusterState clusterState = client().admin().cluster().prepareState().get().getState();
205206
final PersistentTasksCustomMetaData tasks = clusterState.getMetaData().custom(PersistentTasksCustomMetaData.TYPE);
206207
assertThat(tasks.tasks().size(), equalTo(0));
208+
209+
ListTasksRequest listTasksRequest = new ListTasksRequest();
210+
listTasksRequest.setDetailed(true);
211+
ListTasksResponse listTasksResponse = client().admin().cluster().listTasks(listTasksRequest).get();
212+
int numNodeTasks = 0;
213+
for (TaskInfo taskInfo : listTasksResponse.getTasks()) {
214+
if (taskInfo.getAction().startsWith(ListTasksAction.NAME) == false) {
215+
numNodeTasks++;
216+
}
217+
}
218+
assertThat(numNodeTasks, equalTo(0));
207219
});
208220
}
209221

Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
{
2+
"xpack.ccr.follow_existing_index": {
3+
"documentation": "http://www.elastic.co/guide/en/elasticsearch/reference/current",
4+
"methods": [ "POST" ],
5+
"url": {
6+
"path": "/_xpack/ccr/{follow_index}/_follow",
7+
"paths": [ "/_xpack/ccr/{follow_index}/_follow" ],
8+
"parts": {
9+
"follow_index": {
10+
"type": "string",
11+
"required": true,
12+
"description": "The name of the index that follows to leader index."
13+
}
14+
},
15+
"params": {
16+
"leader_index": {
17+
"type": "string",
18+
"required": true,
19+
"description": "The name of the index to read the changes from."
20+
}
21+
}
22+
}
23+
}
24+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
{
2+
"xpack.ccr.unfollow_index": {
3+
"documentation": "http://www.elastic.co/guide/en/elasticsearch/reference/current",
4+
"methods": [ "POST" ],
5+
"url": {
6+
"path": "/_xpack/ccr/{follow_index}/_unfollow",
7+
"paths": [ "/_xpack/ccr/{follow_index}/_unfollow" ],
8+
"parts": {
9+
"follow_index": {
10+
"type": "string",
11+
"required": true,
12+
"description": "The name of the follow index that should stop following its leader index."
13+
}
14+
},
15+
"params": {
16+
}
17+
}
18+
}
19+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
---
2+
"Test follow and unfollow an existing index":
3+
- do:
4+
indices.create:
5+
index: foo
6+
body:
7+
mappings:
8+
doc:
9+
properties:
10+
field:
11+
type: keyword
12+
- is_true: acknowledged
13+
14+
- do:
15+
indices.create:
16+
index: bar
17+
body:
18+
mappings:
19+
doc:
20+
properties:
21+
field:
22+
type: keyword
23+
- is_true: acknowledged
24+
25+
- do:
26+
xpack.ccr.follow_existing_index:
27+
leader_index: foo
28+
follow_index: bar
29+
- is_true: acknowledged
30+
31+
- do:
32+
xpack.ccr.unfollow_index:
33+
follow_index: bar
34+
- is_true: acknowledged

0 commit comments

Comments
 (0)