Skip to content

Commit 4dbf498

Browse files
authored
[Rollup] Job deletion should be invoked on the allocated task (#34574)
We should delete a job by directly talking to the allocated task and telling it to shutdown. Today we shut down a job via the persistent task framework. This is not ideal because, while the job has been removed from the persistent task CS, the allocated task continues to live until it gets the shutdown message. This means a user can delete a job, immediately delete the rollup index, and then see new documents appear in the just-deleted index. This happens because the indexer in the allocated task is still running and indexes a few more documents before getting the shutdown command. In this PR, the transport action is changed to a TransportTasksAction, and we invoke onCancelled() directly on the matching job. The race condition still exists after this PR (albeit less likely), but this was a precursor to fixing the issue and a self-contained chunk of code. A second PR will followup to fix the race itself.
1 parent 46b49b0 commit 4dbf498

File tree

10 files changed

+249
-186
lines changed

10 files changed

+249
-186
lines changed

docs/reference/rollup/apis/delete-job.asciidoc

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -8,8 +8,8 @@
88

99
experimental[]
1010

11-
This API deletes an existing rollup job. The job can be started or stopped, in both cases it will be deleted. Attempting
12-
to delete a non-existing job will throw an exception
11+
This API deletes an existing rollup job. A job must be *stopped* first before it can be deleted. Attempting to delete
12+
a started job will result in an error. Similarly, attempting to delete a nonexistent job will throw an exception.
1313

1414
.Deleting the job does not delete rolled up data
1515
**********************************
@@ -99,12 +99,12 @@ A 404 `resource_not_found` exception will be thrown:
9999
"root_cause" : [
100100
{
101101
"type" : "resource_not_found_exception",
102-
"reason" : "the task with id does_not_exist doesn't exist",
102+
"reason" : "the task with id [does_not_exist] doesn't exist",
103103
"stack_trace": ...
104104
}
105105
],
106106
"type" : "resource_not_found_exception",
107-
"reason" : "the task with id does_not_exist doesn't exist",
107+
"reason" : "the task with id [does_not_exist] doesn't exist",
108108
"stack_trace": ...
109109
},
110110
"status": 404

server/src/main/java/org/elasticsearch/action/admin/cluster/node/tasks/list/ListTasksResponse.java

Lines changed: 0 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -51,8 +51,6 @@
5151
*/
5252
public class ListTasksResponse extends BaseTasksResponse implements ToXContentObject {
5353
private static final String TASKS = "tasks";
54-
private static final String TASK_FAILURES = "task_failures";
55-
private static final String NODE_FAILURES = "node_failures";
5654

5755
private List<TaskInfo> tasks;
5856

@@ -246,28 +244,6 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
246244
return builder;
247245
}
248246

249-
private void toXContentCommon(XContentBuilder builder, Params params) throws IOException {
250-
if (getTaskFailures() != null && getTaskFailures().size() > 0) {
251-
builder.startArray(TASK_FAILURES);
252-
for (TaskOperationFailure ex : getTaskFailures()){
253-
builder.startObject();
254-
builder.value(ex);
255-
builder.endObject();
256-
}
257-
builder.endArray();
258-
}
259-
260-
if (getNodeFailures() != null && getNodeFailures().size() > 0) {
261-
builder.startArray(NODE_FAILURES);
262-
for (ElasticsearchException ex : getNodeFailures()) {
263-
builder.startObject();
264-
ex.toXContent(builder, params);
265-
builder.endObject();
266-
}
267-
builder.endArray();
268-
}
269-
}
270-
271247
public static ListTasksResponse fromXContent(XContentParser parser) {
272248
return PARSER.apply(parser, null);
273249
}

server/src/main/java/org/elasticsearch/action/support/tasks/BaseTasksResponse.java

Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,12 +25,15 @@
2525
import org.elasticsearch.action.TaskOperationFailure;
2626
import org.elasticsearch.common.io.stream.StreamInput;
2727
import org.elasticsearch.common.io.stream.StreamOutput;
28+
import org.elasticsearch.common.xcontent.ToXContent;
29+
import org.elasticsearch.common.xcontent.XContentBuilder;
2830
import org.elasticsearch.tasks.TaskId;
2931

3032
import java.io.IOException;
3133
import java.util.ArrayList;
3234
import java.util.Collections;
3335
import java.util.List;
36+
import java.util.Objects;
3437
import java.util.stream.Stream;
3538

3639
import static java.util.stream.Collectors.toList;
@@ -41,6 +44,9 @@
4144
* Base class for responses of task-related operations
4245
*/
4346
public class BaseTasksResponse extends ActionResponse {
47+
protected static final String TASK_FAILURES = "task_failures";
48+
protected static final String NODE_FAILURES = "node_failures";
49+
4450
private List<TaskOperationFailure> taskFailures;
4551
private List<ElasticsearchException> nodeFailures;
4652

@@ -103,4 +109,44 @@ public void writeTo(StreamOutput out) throws IOException {
103109
exp.writeTo(out);
104110
}
105111
}
112+
113+
protected void toXContentCommon(XContentBuilder builder, ToXContent.Params params) throws IOException {
114+
if (getTaskFailures() != null && getTaskFailures().size() > 0) {
115+
builder.startArray(TASK_FAILURES);
116+
for (TaskOperationFailure ex : getTaskFailures()){
117+
builder.startObject();
118+
builder.value(ex);
119+
builder.endObject();
120+
}
121+
builder.endArray();
122+
}
123+
124+
if (getNodeFailures() != null && getNodeFailures().size() > 0) {
125+
builder.startArray(NODE_FAILURES);
126+
for (ElasticsearchException ex : getNodeFailures()) {
127+
builder.startObject();
128+
ex.toXContent(builder, params);
129+
builder.endObject();
130+
}
131+
builder.endArray();
132+
}
133+
}
134+
135+
@Override
136+
public boolean equals(Object o) {
137+
if (this == o) {
138+
return true;
139+
}
140+
if (o == null || getClass() != o.getClass()) {
141+
return false;
142+
}
143+
BaseTasksResponse response = (BaseTasksResponse) o;
144+
return taskFailures.equals(response.taskFailures)
145+
&& nodeFailures.equals(response.nodeFailures);
146+
}
147+
148+
@Override
149+
public int hashCode() {
150+
return Objects.hash(taskFailures, nodeFailures);
151+
}
106152
}

test/framework/src/main/java/org/elasticsearch/test/rest/ESRestTestCase.java

Lines changed: 25 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,7 @@
6868
import java.security.NoSuchAlgorithmException;
6969
import java.security.cert.CertificateException;
7070
import java.util.ArrayList;
71+
import java.util.Arrays;
7172
import java.util.HashSet;
7273
import java.util.List;
7374
import java.util.Map;
@@ -449,7 +450,7 @@ private void wipeClusterSettings() throws IOException {
449450
}
450451
}
451452

452-
private void wipeRollupJobs() throws IOException {
453+
private void wipeRollupJobs() throws IOException, InterruptedException {
453454
Response response = adminClient().performRequest(new Request("GET", "/_xpack/rollup/job/_all"));
454455
Map<String, Object> jobs = entityAsMap(response);
455456
@SuppressWarnings("unchecked")
@@ -460,6 +461,29 @@ private void wipeRollupJobs() throws IOException {
460461
return;
461462
}
462463

464+
for (Map<String, Object> jobConfig : jobConfigs) {
465+
@SuppressWarnings("unchecked")
466+
String jobId = (String) ((Map<String, Object>) jobConfig.get("config")).get("id");
467+
Request request = new Request("POST", "/_xpack/rollup/job/" + jobId + "/_stop");
468+
request.addParameter("ignore", "404");
469+
logger.debug("stopping rollup job [{}]", jobId);
470+
adminClient().performRequest(request);
471+
}
472+
473+
// TODO this is temporary until StopJob API gains the ability to block until stopped
474+
awaitBusy(() -> {
475+
Request request = new Request("GET", "/_xpack/rollup/job/_all");
476+
try {
477+
Response jobsResponse = adminClient().performRequest(request);
478+
String body = EntityUtils.toString(jobsResponse.getEntity());
479+
logger.error(body);
480+
// If the body contains any of the non-stopped states, at least one job is not finished yet
481+
return Arrays.stream(new String[]{"started", "aborting", "stopping", "indexing"}).noneMatch(body::contains);
482+
} catch (IOException e) {
483+
return false;
484+
}
485+
}, 10, TimeUnit.SECONDS);
486+
463487
for (Map<String, Object> jobConfig : jobConfigs) {
464488
@SuppressWarnings("unchecked")
465489
String jobId = (String) ((Map<String, Object>) jobConfig.get("config")).get("id");

x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/rollup/action/DeleteRollupJobAction.java

Lines changed: 87 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -7,22 +7,29 @@
77

88

99
import org.elasticsearch.action.Action;
10+
import org.elasticsearch.action.ActionRequestBuilder;
1011
import org.elasticsearch.action.ActionRequestValidationException;
11-
import org.elasticsearch.action.support.master.AcknowledgedRequest;
12-
import org.elasticsearch.action.support.master.AcknowledgedResponse;
13-
import org.elasticsearch.action.support.master.MasterNodeOperationRequestBuilder;
12+
import org.elasticsearch.action.FailedNodeException;
13+
import org.elasticsearch.action.TaskOperationFailure;
14+
import org.elasticsearch.action.support.tasks.BaseTasksRequest;
15+
import org.elasticsearch.action.support.tasks.BaseTasksResponse;
1416
import org.elasticsearch.client.ElasticsearchClient;
1517
import org.elasticsearch.common.io.stream.StreamInput;
1618
import org.elasticsearch.common.io.stream.StreamOutput;
17-
import org.elasticsearch.common.xcontent.ToXContent;
19+
import org.elasticsearch.common.io.stream.Writeable;
20+
import org.elasticsearch.common.xcontent.ToXContentFragment;
21+
import org.elasticsearch.common.xcontent.ToXContentObject;
1822
import org.elasticsearch.common.xcontent.XContentBuilder;
23+
import org.elasticsearch.tasks.Task;
1924
import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper;
2025
import org.elasticsearch.xpack.core.rollup.RollupField;
2126

2227
import java.io.IOException;
28+
import java.util.Collections;
29+
import java.util.List;
2330
import java.util.Objects;
2431

25-
public class DeleteRollupJobAction extends Action<AcknowledgedResponse> {
32+
public class DeleteRollupJobAction extends Action<DeleteRollupJobAction.Response> {
2633

2734
public static final DeleteRollupJobAction INSTANCE = new DeleteRollupJobAction();
2835
public static final String NAME = "cluster:admin/xpack/rollup/delete";
@@ -32,11 +39,11 @@ private DeleteRollupJobAction() {
3239
}
3340

3441
@Override
35-
public AcknowledgedResponse newResponse() {
36-
return new AcknowledgedResponse();
42+
public Response newResponse() {
43+
return new Response();
3744
}
3845

39-
public static class Request extends AcknowledgedRequest<Request> implements ToXContent {
46+
public static class Request extends BaseTasksRequest<Request> implements ToXContentFragment {
4047
private String id;
4148

4249
public Request(String id) {
@@ -45,6 +52,11 @@ public Request(String id) {
4552

4653
public Request() {}
4754

55+
@Override
56+
public boolean match(Task task) {
57+
return task.getDescription().equals(RollupField.NAME + "_" + id);
58+
}
59+
4860
public String getId() {
4961
return id;
5062
}
@@ -90,10 +102,74 @@ public boolean equals(Object obj) {
90102
}
91103
}
92104

93-
public static class RequestBuilder extends MasterNodeOperationRequestBuilder<Request, AcknowledgedResponse, RequestBuilder> {
94-
105+
public static class RequestBuilder extends ActionRequestBuilder<DeleteRollupJobAction.Request, DeleteRollupJobAction.Response> {
95106
protected RequestBuilder(ElasticsearchClient client, DeleteRollupJobAction action) {
96-
super(client, action, new Request());
107+
super(client, action, new DeleteRollupJobAction.Request());
108+
}
109+
}
110+
111+
public static class Response extends BaseTasksResponse implements Writeable, ToXContentObject {
112+
113+
private boolean acknowledged;
114+
115+
public Response(StreamInput in) throws IOException {
116+
super(Collections.emptyList(), Collections.emptyList());
117+
readFrom(in);
118+
}
119+
120+
public Response(boolean acknowledged, List<TaskOperationFailure> taskFailures, List<FailedNodeException> nodeFailures) {
121+
super(taskFailures, nodeFailures);
122+
this.acknowledged = acknowledged;
123+
}
124+
125+
public Response(boolean acknowledged) {
126+
super(Collections.emptyList(), Collections.emptyList());
127+
this.acknowledged = acknowledged;
128+
}
129+
130+
public Response() {
131+
super(Collections.emptyList(), Collections.emptyList());
132+
this.acknowledged = false;
133+
}
134+
135+
public boolean isDeleted() {
136+
return acknowledged;
137+
}
138+
139+
@Override
140+
public void readFrom(StreamInput in) throws IOException {
141+
super.readFrom(in);
142+
acknowledged = in.readBoolean();
143+
}
144+
145+
@Override
146+
public void writeTo(StreamOutput out) throws IOException {
147+
super.writeTo(out);
148+
out.writeBoolean(acknowledged);
149+
}
150+
151+
@Override
152+
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
153+
builder.startObject();
154+
{
155+
toXContentCommon(builder, params);
156+
builder.field("acknowledged", acknowledged);
157+
}
158+
builder.endObject();
159+
return builder;
160+
}
161+
162+
@Override
163+
public boolean equals(Object o) {
164+
if (this == o) return true;
165+
if (o == null || getClass() != o.getClass()) return false;
166+
DeleteRollupJobAction.Response response = (DeleteRollupJobAction.Response) o;
167+
return super.equals(o) && acknowledged == response.acknowledged;
168+
}
169+
170+
@Override
171+
public int hashCode() {
172+
return Objects.hash(super.hashCode(), acknowledged);
97173
}
98174
}
99175
}

0 commit comments

Comments
 (0)