Skip to content

Commit 9622f2e

Browse files
committed
[Rollup] Job deletion should be invoked on the allocated task (#34574)
We should delete a job by directly talking to the allocated task and telling it to shutdown. Today we shut down a job via the persistent task framework. This is not ideal because, while the job has been removed from the persistent task CS, the allocated task continues to live until it gets the shutdown message. This means a user can delete a job, immediately delete the rollup index, and then see new documents appear in the just-deleted index. This happens because the indexer in the allocated task is still running and indexes a few more documents before getting the shutdown command. In this PR, the transport action is changed to a TransportTasksAction, and we invoke onCancelled() directly on the matching job. The race condition still exists after this PR (albeit less likely), but this was a precursor to fixing the issue and a self-contained chunk of code. A second PR will followup to fix the race itself.
1 parent b4c665b commit 9622f2e

File tree

10 files changed

+250
-184
lines changed

10 files changed

+250
-184
lines changed

docs/reference/rollup/apis/delete-job.asciidoc

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -8,8 +8,8 @@
88

99
experimental[]
1010

11-
This API deletes an existing rollup job. The job can be started or stopped, in both cases it will be deleted. Attempting
12-
to delete a non-existing job will throw an exception
11+
This API deletes an existing rollup job. A job must be *stopped* first before it can be deleted. Attempting to delete
12+
a started job will result in an error. Similarly, attempting to delete a nonexistent job will throw an exception.
1313

1414
.Deleting the job does not delete rolled up data
1515
**********************************
@@ -99,12 +99,12 @@ A 404 `resource_not_found` exception will be thrown:
9999
"root_cause" : [
100100
{
101101
"type" : "resource_not_found_exception",
102-
"reason" : "the task with id does_not_exist doesn't exist",
102+
"reason" : "the task with id [does_not_exist] doesn't exist",
103103
"stack_trace": ...
104104
}
105105
],
106106
"type" : "resource_not_found_exception",
107-
"reason" : "the task with id does_not_exist doesn't exist",
107+
"reason" : "the task with id [does_not_exist] doesn't exist",
108108
"stack_trace": ...
109109
},
110110
"status": 404

server/src/main/java/org/elasticsearch/action/admin/cluster/node/tasks/list/ListTasksResponse.java

Lines changed: 0 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -51,8 +51,6 @@
5151
*/
5252
public class ListTasksResponse extends BaseTasksResponse implements ToXContentObject {
5353
private static final String TASKS = "tasks";
54-
private static final String TASK_FAILURES = "task_failures";
55-
private static final String NODE_FAILURES = "node_failures";
5654

5755
private List<TaskInfo> tasks;
5856

@@ -246,28 +244,6 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
246244
return builder;
247245
}
248246

249-
private void toXContentCommon(XContentBuilder builder, Params params) throws IOException {
250-
if (getTaskFailures() != null && getTaskFailures().size() > 0) {
251-
builder.startArray(TASK_FAILURES);
252-
for (TaskOperationFailure ex : getTaskFailures()){
253-
builder.startObject();
254-
builder.value(ex);
255-
builder.endObject();
256-
}
257-
builder.endArray();
258-
}
259-
260-
if (getNodeFailures() != null && getNodeFailures().size() > 0) {
261-
builder.startArray(NODE_FAILURES);
262-
for (ElasticsearchException ex : getNodeFailures()) {
263-
builder.startObject();
264-
ex.toXContent(builder, params);
265-
builder.endObject();
266-
}
267-
builder.endArray();
268-
}
269-
}
270-
271247
public static ListTasksResponse fromXContent(XContentParser parser) {
272248
return PARSER.apply(parser, null);
273249
}

server/src/main/java/org/elasticsearch/action/support/tasks/BaseTasksResponse.java

Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,12 +25,15 @@
2525
import org.elasticsearch.action.TaskOperationFailure;
2626
import org.elasticsearch.common.io.stream.StreamInput;
2727
import org.elasticsearch.common.io.stream.StreamOutput;
28+
import org.elasticsearch.common.xcontent.ToXContent;
29+
import org.elasticsearch.common.xcontent.XContentBuilder;
2830
import org.elasticsearch.tasks.TaskId;
2931

3032
import java.io.IOException;
3133
import java.util.ArrayList;
3234
import java.util.Collections;
3335
import java.util.List;
36+
import java.util.Objects;
3437
import java.util.stream.Stream;
3538

3639
import static java.util.stream.Collectors.toList;
@@ -41,6 +44,9 @@
4144
* Base class for responses of task-related operations
4245
*/
4346
public class BaseTasksResponse extends ActionResponse {
47+
protected static final String TASK_FAILURES = "task_failures";
48+
protected static final String NODE_FAILURES = "node_failures";
49+
4450
private List<TaskOperationFailure> taskFailures;
4551
private List<ElasticsearchException> nodeFailures;
4652

@@ -103,4 +109,44 @@ public void writeTo(StreamOutput out) throws IOException {
103109
exp.writeTo(out);
104110
}
105111
}
112+
113+
protected void toXContentCommon(XContentBuilder builder, ToXContent.Params params) throws IOException {
114+
if (getTaskFailures() != null && getTaskFailures().size() > 0) {
115+
builder.startArray(TASK_FAILURES);
116+
for (TaskOperationFailure ex : getTaskFailures()){
117+
builder.startObject();
118+
builder.value(ex);
119+
builder.endObject();
120+
}
121+
builder.endArray();
122+
}
123+
124+
if (getNodeFailures() != null && getNodeFailures().size() > 0) {
125+
builder.startArray(NODE_FAILURES);
126+
for (ElasticsearchException ex : getNodeFailures()) {
127+
builder.startObject();
128+
ex.toXContent(builder, params);
129+
builder.endObject();
130+
}
131+
builder.endArray();
132+
}
133+
}
134+
135+
@Override
136+
public boolean equals(Object o) {
137+
if (this == o) {
138+
return true;
139+
}
140+
if (o == null || getClass() != o.getClass()) {
141+
return false;
142+
}
143+
BaseTasksResponse response = (BaseTasksResponse) o;
144+
return taskFailures.equals(response.taskFailures)
145+
&& nodeFailures.equals(response.nodeFailures);
146+
}
147+
148+
@Override
149+
public int hashCode() {
150+
return Objects.hash(taskFailures, nodeFailures);
151+
}
106152
}

test/framework/src/main/java/org/elasticsearch/test/rest/ESRestTestCase.java

Lines changed: 25 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,7 @@
6868
import java.security.NoSuchAlgorithmException;
6969
import java.security.cert.CertificateException;
7070
import java.util.ArrayList;
71+
import java.util.Arrays;
7172
import java.util.HashSet;
7273
import java.util.List;
7374
import java.util.Map;
@@ -459,7 +460,7 @@ private void wipeClusterSettings() throws IOException {
459460
}
460461
}
461462

462-
private void wipeRollupJobs() throws IOException {
463+
private void wipeRollupJobs() throws IOException, InterruptedException {
463464
Response response = adminClient().performRequest(new Request("GET", "/_xpack/rollup/job/_all"));
464465
Map<String, Object> jobs = entityAsMap(response);
465466
@SuppressWarnings("unchecked")
@@ -470,6 +471,29 @@ private void wipeRollupJobs() throws IOException {
470471
return;
471472
}
472473

474+
for (Map<String, Object> jobConfig : jobConfigs) {
475+
@SuppressWarnings("unchecked")
476+
String jobId = (String) ((Map<String, Object>) jobConfig.get("config")).get("id");
477+
Request request = new Request("POST", "/_xpack/rollup/job/" + jobId + "/_stop");
478+
request.addParameter("ignore", "404");
479+
logger.debug("stopping rollup job [{}]", jobId);
480+
adminClient().performRequest(request);
481+
}
482+
483+
// TODO this is temporary until StopJob API gains the ability to block until stopped
484+
awaitBusy(() -> {
485+
Request request = new Request("GET", "/_xpack/rollup/job/_all");
486+
try {
487+
Response jobsResponse = adminClient().performRequest(request);
488+
String body = EntityUtils.toString(jobsResponse.getEntity());
489+
logger.error(body);
490+
// If the body contains any of the non-stopped states, at least one job is not finished yet
491+
return Arrays.stream(new String[]{"started", "aborting", "stopping", "indexing"}).noneMatch(body::contains);
492+
} catch (IOException e) {
493+
return false;
494+
}
495+
}, 10, TimeUnit.SECONDS);
496+
473497
for (Map<String, Object> jobConfig : jobConfigs) {
474498
@SuppressWarnings("unchecked")
475499
String jobId = (String) ((Map<String, Object>) jobConfig.get("config")).get("id");

x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/rollup/action/DeleteRollupJobAction.java

Lines changed: 89 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -7,23 +7,30 @@
77

88

99
import org.elasticsearch.action.Action;
10+
import org.elasticsearch.action.ActionRequestBuilder;
1011
import org.elasticsearch.action.ActionRequestValidationException;
11-
import org.elasticsearch.action.support.master.AcknowledgedRequest;
12-
import org.elasticsearch.action.support.master.AcknowledgedResponse;
13-
import org.elasticsearch.action.support.master.MasterNodeOperationRequestBuilder;
12+
import org.elasticsearch.action.FailedNodeException;
13+
import org.elasticsearch.action.TaskOperationFailure;
14+
import org.elasticsearch.action.support.tasks.BaseTasksRequest;
15+
import org.elasticsearch.action.support.tasks.BaseTasksResponse;
1416
import org.elasticsearch.client.ElasticsearchClient;
1517
import org.elasticsearch.common.io.stream.StreamInput;
1618
import org.elasticsearch.common.io.stream.StreamOutput;
17-
import org.elasticsearch.common.xcontent.ToXContent;
19+
import org.elasticsearch.common.io.stream.Writeable;
20+
import org.elasticsearch.common.xcontent.ToXContentFragment;
21+
import org.elasticsearch.common.xcontent.ToXContentObject;
1822
import org.elasticsearch.common.xcontent.XContentBuilder;
23+
import org.elasticsearch.tasks.Task;
1924
import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper;
2025
import org.elasticsearch.xpack.core.rollup.RollupField;
2126

2227
import java.io.IOException;
28+
import java.util.Collections;
29+
import java.util.List;
2330
import java.util.Objects;
2431

25-
public class DeleteRollupJobAction extends Action<DeleteRollupJobAction.Request, AcknowledgedResponse,
26-
DeleteRollupJobAction.RequestBuilder> {
32+
public class DeleteRollupJobAction extends Action<DeleteRollupJobAction.Request, DeleteRollupJobAction.Response,
33+
DeleteRollupJobAction.RequestBuilder> {
2734

2835
public static final DeleteRollupJobAction INSTANCE = new DeleteRollupJobAction();
2936
public static final String NAME = "cluster:admin/xpack/rollup/delete";
@@ -38,11 +45,11 @@ public RequestBuilder newRequestBuilder(ElasticsearchClient client) {
3845
}
3946

4047
@Override
41-
public AcknowledgedResponse newResponse() {
42-
return new AcknowledgedResponse();
48+
public Response newResponse() {
49+
return new Response();
4350
}
4451

45-
public static class Request extends AcknowledgedRequest<Request> implements ToXContent {
52+
public static class Request extends BaseTasksRequest<Request> implements ToXContentFragment {
4653
private String id;
4754

4855
public Request(String id) {
@@ -51,6 +58,11 @@ public Request(String id) {
5158

5259
public Request() {}
5360

61+
@Override
62+
public boolean match(Task task) {
63+
return task.getDescription().equals(RollupField.NAME + "_" + id);
64+
}
65+
5466
public String getId() {
5567
return id;
5668
}
@@ -96,10 +108,75 @@ public boolean equals(Object obj) {
96108
}
97109
}
98110

99-
public static class RequestBuilder extends MasterNodeOperationRequestBuilder<Request, AcknowledgedResponse, RequestBuilder> {
100-
111+
public static class RequestBuilder extends ActionRequestBuilder<DeleteRollupJobAction.Request,
112+
DeleteRollupJobAction.Response, RequestBuilder> {
101113
protected RequestBuilder(ElasticsearchClient client, DeleteRollupJobAction action) {
102-
super(client, action, new Request());
114+
super(client, action, new DeleteRollupJobAction.Request());
115+
}
116+
}
117+
118+
public static class Response extends BaseTasksResponse implements Writeable, ToXContentObject {
119+
120+
private boolean acknowledged;
121+
122+
public Response(StreamInput in) throws IOException {
123+
super(Collections.emptyList(), Collections.emptyList());
124+
readFrom(in);
125+
}
126+
127+
public Response(boolean acknowledged, List<TaskOperationFailure> taskFailures, List<FailedNodeException> nodeFailures) {
128+
super(taskFailures, nodeFailures);
129+
this.acknowledged = acknowledged;
130+
}
131+
132+
public Response(boolean acknowledged) {
133+
super(Collections.emptyList(), Collections.emptyList());
134+
this.acknowledged = acknowledged;
135+
}
136+
137+
public Response() {
138+
super(Collections.emptyList(), Collections.emptyList());
139+
this.acknowledged = false;
140+
}
141+
142+
public boolean isDeleted() {
143+
return acknowledged;
144+
}
145+
146+
@Override
147+
public void readFrom(StreamInput in) throws IOException {
148+
super.readFrom(in);
149+
acknowledged = in.readBoolean();
150+
}
151+
152+
@Override
153+
public void writeTo(StreamOutput out) throws IOException {
154+
super.writeTo(out);
155+
out.writeBoolean(acknowledged);
156+
}
157+
158+
@Override
159+
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
160+
builder.startObject();
161+
{
162+
toXContentCommon(builder, params);
163+
builder.field("acknowledged", acknowledged);
164+
}
165+
builder.endObject();
166+
return builder;
167+
}
168+
169+
@Override
170+
public boolean equals(Object o) {
171+
if (this == o) return true;
172+
if (o == null || getClass() != o.getClass()) return false;
173+
DeleteRollupJobAction.Response response = (DeleteRollupJobAction.Response) o;
174+
return super.equals(o) && acknowledged == response.acknowledged;
175+
}
176+
177+
@Override
178+
public int hashCode() {
179+
return Objects.hash(super.hashCode(), acknowledged);
103180
}
104181
}
105182
}

0 commit comments

Comments
 (0)