Skip to content

Commit 9d94d57

Browse files
authored
[ML] Complete the Data Frame task on stop (#41752)
Wait for indexer to stop then complete the persistent task on stop. If the wait_for_completion is true the request will not return until stopped.
1 parent c80da04 commit 9d94d57

File tree

15 files changed

+249
-277
lines changed

15 files changed

+249
-277
lines changed

client/rest-high-level/src/test/java/org/elasticsearch/client/DataFrameTransformIT.java

+3-2
Original file line numberDiff line numberDiff line change
@@ -141,7 +141,8 @@ private void indexData(String indexName) throws IOException {
141141
@After
142142
public void cleanUpTransforms() throws IOException {
143143
for (String transformId : transformsToClean) {
144-
highLevelClient().dataFrame().stopDataFrameTransform(new StopDataFrameTransformRequest(transformId), RequestOptions.DEFAULT);
144+
highLevelClient().dataFrame().stopDataFrameTransform(
145+
new StopDataFrameTransformRequest(transformId, Boolean.TRUE, null), RequestOptions.DEFAULT);
145146
}
146147

147148
for (String transformId : transformsToClean) {
@@ -265,7 +266,7 @@ public void testStartStop() throws IOException {
265266
assertThat(statsResponse.getTransformsStateAndStats(), hasSize(1));
266267
assertEquals(IndexerState.STARTED, statsResponse.getTransformsStateAndStats().get(0).getTransformState().getIndexerState());
267268

268-
StopDataFrameTransformRequest stopRequest = new StopDataFrameTransformRequest(id);
269+
StopDataFrameTransformRequest stopRequest = new StopDataFrameTransformRequest(id, Boolean.TRUE, null);
269270
StopDataFrameTransformResponse stopResponse =
270271
execute(stopRequest, client::stopDataFrameTransform, client::stopDataFrameTransformAsync);
271272
assertTrue(stopResponse.isStopped());

client/rest-high-level/src/test/java/org/elasticsearch/client/documentation/DataFrameTransformDocumentationIT.java

+2-1
Original file line numberDiff line numberDiff line change
@@ -76,7 +76,8 @@ public class DataFrameTransformDocumentationIT extends ESRestHighLevelClientTest
7676
@After
7777
public void cleanUpTransforms() throws IOException {
7878
for (String transformId : transformsToClean) {
79-
highLevelClient().dataFrame().stopDataFrameTransform(new StopDataFrameTransformRequest(transformId), RequestOptions.DEFAULT);
79+
highLevelClient().dataFrame().stopDataFrameTransform(
80+
new StopDataFrameTransformRequest(transformId, Boolean.TRUE, TimeValue.timeValueSeconds(20)), RequestOptions.DEFAULT);
8081
}
8182

8283
for (String transformId : transformsToClean) {

x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/dataframe/action/DeleteDataFrameTransformAction.java

+12-75
Original file line numberDiff line numberDiff line change
@@ -7,25 +7,18 @@
77

88
import org.elasticsearch.action.Action;
99
import org.elasticsearch.action.ActionRequestValidationException;
10-
import org.elasticsearch.action.FailedNodeException;
11-
import org.elasticsearch.action.TaskOperationFailure;
12-
import org.elasticsearch.action.support.tasks.BaseTasksRequest;
13-
import org.elasticsearch.action.support.tasks.BaseTasksResponse;
10+
import org.elasticsearch.action.support.master.AcknowledgedResponse;
11+
import org.elasticsearch.action.support.master.MasterNodeRequest;
1412
import org.elasticsearch.common.io.stream.StreamInput;
1513
import org.elasticsearch.common.io.stream.StreamOutput;
1614
import org.elasticsearch.common.io.stream.Writeable;
17-
import org.elasticsearch.common.xcontent.ToXContentObject;
18-
import org.elasticsearch.common.xcontent.XContentBuilder;
19-
import org.elasticsearch.tasks.Task;
2015
import org.elasticsearch.xpack.core.dataframe.DataFrameField;
2116
import org.elasticsearch.xpack.core.dataframe.utils.ExceptionsHelper;
2217

2318
import java.io.IOException;
24-
import java.util.Collections;
25-
import java.util.List;
2619
import java.util.Objects;
2720

28-
public class DeleteDataFrameTransformAction extends Action<DeleteDataFrameTransformAction.Response> {
21+
public class DeleteDataFrameTransformAction extends Action<AcknowledgedResponse> {
2922

3023
public static final DeleteDataFrameTransformAction INSTANCE = new DeleteDataFrameTransformAction();
3124
public static final String NAME = "cluster:admin/data_frame/delete";
@@ -35,17 +28,21 @@ private DeleteDataFrameTransformAction() {
3528
}
3629

3730
@Override
38-
public Response newResponse() {
31+
public AcknowledgedResponse newResponse() {
3932
throw new UnsupportedOperationException("usage of Streamable is to be replaced by Writeable");
4033
}
4134

4235
@Override
43-
public Writeable.Reader<Response> getResponseReader() {
44-
return Response::new;
36+
public Writeable.Reader<AcknowledgedResponse> getResponseReader() {
37+
return in -> {
38+
AcknowledgedResponse response = new AcknowledgedResponse();
39+
response.readFrom(in);
40+
return response;
41+
};
4542
}
4643

47-
public static class Request extends BaseTasksRequest<Request> {
48-
private final String id;
44+
public static class Request extends MasterNodeRequest<Request> {
45+
private String id;
4946

5047
public Request(String id) {
5148
this.id = ExceptionsHelper.requireNonNull(id, DataFrameField.ID.getPreferredName());
@@ -60,11 +57,6 @@ public String getId() {
6057
return id;
6158
}
6259

63-
@Override
64-
public boolean match(Task task) {
65-
return task.getDescription().equals(DataFrameField.PERSISTENT_TASK_DESCRIPTION_PREFIX + id);
66-
}
67-
6860
@Override
6961
public void writeTo(StreamOutput out) throws IOException {
7062
super.writeTo(out);
@@ -94,59 +86,4 @@ public boolean equals(Object obj) {
9486
return Objects.equals(id, other.id);
9587
}
9688
}
97-
98-
public static class Response extends BaseTasksResponse implements Writeable, ToXContentObject {
99-
100-
private final boolean acknowledged;
101-
102-
public Response(StreamInput in) throws IOException {
103-
super(in);
104-
acknowledged = in.readBoolean();
105-
}
106-
107-
public Response(boolean acknowledged, List<TaskOperationFailure> taskFailures, List<FailedNodeException> nodeFailures) {
108-
super(taskFailures, nodeFailures);
109-
this.acknowledged = acknowledged;
110-
}
111-
112-
public Response(boolean acknowledged) {
113-
this(acknowledged, Collections.emptyList(), Collections.emptyList());
114-
}
115-
116-
public boolean isDeleted() {
117-
return acknowledged;
118-
}
119-
120-
@Override
121-
public void writeTo(StreamOutput out) throws IOException {
122-
super.writeTo(out);
123-
out.writeBoolean(acknowledged);
124-
}
125-
126-
@Override
127-
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
128-
builder.startObject();
129-
{
130-
toXContentCommon(builder, params);
131-
builder.field("acknowledged", acknowledged);
132-
}
133-
builder.endObject();
134-
return builder;
135-
}
136-
137-
@Override
138-
public boolean equals(Object o) {
139-
if (this == o)
140-
return true;
141-
if (o == null || getClass() != o.getClass())
142-
return false;
143-
DeleteDataFrameTransformAction.Response response = (DeleteDataFrameTransformAction.Response) o;
144-
return super.equals(o) && acknowledged == response.acknowledged;
145-
}
146-
147-
@Override
148-
public int hashCode() {
149-
return Objects.hash(super.hashCode(), acknowledged);
150-
}
151-
}
15289
}

x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/indexing/AsyncTwoPhaseIndexer.java

+19-5
Original file line numberDiff line numberDiff line change
@@ -22,9 +22,11 @@
2222
* An abstract class that builds an index incrementally. A background job can be launched using {@link #maybeTriggerAsyncJob(long)},
2323
* it will create the index from the source index up to the last complete bucket that is allowed to be built (based on job position).
2424
* Only one background job can run simultaneously and {@link #onFinish} is called when the job
25-
* finishes. {@link #onFailure(Exception)} is called if the job fails with an exception and {@link #onAbort()} is called if the indexer is
26-
* aborted while a job is running. The indexer must be started ({@link #start()} to allow a background job to run when
27-
* {@link #maybeTriggerAsyncJob(long)} is called. {@link #stop()} can be used to stop the background job without aborting the indexer.
25+
* finishes. {@link #onStop()} is called after the current search returns when the job is stopped early via a call
26+
* to {@link #stop()}. {@link #onFailure(Exception)} is called if the job fails with an exception and {@link #onAbort()}
27+
* is called if the indexer is aborted while a job is running. The indexer must be started ({@link #start()}
28+
* to allow a background job to run when {@link #maybeTriggerAsyncJob(long)} is called.
29+
* {@link #stop()} can be used to stop the background job without aborting the indexer.
2830
*
2931
* In a nutshell this is a 2 cycle engine: 1st it sends a query, 2nd it indexes documents based on the response, sends the next query,
3032
* indexes, queries, indexes, ... until a condition lets the engine pause until the source provides new input.
@@ -84,8 +86,10 @@ public synchronized IndexerState start() {
8486

8587
/**
8688
* Sets the internal state to {@link IndexerState#STOPPING} if an async job is
87-
* running in the background. If there is no job running when this function is
88-
* called, the state is directly set to {@link IndexerState#STOPPED}.
89+
* running in the background, {@link #onStop()} will be called when the background job
90+
* detects that the indexer is stopped.
91+
* If there is no job running when this function is called
92+
* the state is set to {@link IndexerState#STOPPED} and {@link #onStop()} called directly.
8993
*
9094
* @return The new state for the indexer (STOPPED, STOPPING or ABORTING if the job was already aborted).
9195
*/
@@ -94,6 +98,7 @@ public synchronized IndexerState stop() {
9498
if (previousState == IndexerState.INDEXING) {
9599
return IndexerState.STOPPING;
96100
} else if (previousState == IndexerState.STARTED) {
101+
onStop();
97102
return IndexerState.STOPPED;
98103
} else {
99104
return previousState;
@@ -251,6 +256,14 @@ public synchronized boolean maybeTriggerAsyncJob(long now) {
251256
*/
252257
protected abstract void onFinish(ActionListener<Void> listener);
253258

259+
/**
260+
* Called when the indexer is stopped. This is only called when the indexer is stopped
261+
* via {@link #stop()} as opposed to {@link #onFinish(ActionListener)} which is called
262+
* when the indexer's work is done.
263+
*/
264+
protected void onStop() {
265+
}
266+
254267
/**
255268
* Called when a background job detects that the indexer is aborted causing the
256269
* async execution to stop.
@@ -276,6 +289,7 @@ private IndexerState finishAndSetState() {
276289

277290
case STOPPING:
278291
// must be started again
292+
onStop();
279293
return IndexerState.STOPPED;
280294

281295
case ABORTING:

x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/dataframe/action/DeleteDataFrameTransformActionResponseTests.java

-22
This file was deleted.

x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/indexing/AsyncTwoPhaseIndexerTests.java

+65-10
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
import org.elasticsearch.search.SearchHit;
1919
import org.elasticsearch.search.SearchHits;
2020
import org.elasticsearch.test.ESTestCase;
21+
import org.junit.Before;
2122

2223
import java.io.IOException;
2324
import java.util.Collections;
@@ -34,17 +35,26 @@
3435
public class AsyncTwoPhaseIndexerTests extends ESTestCase {
3536

3637
AtomicBoolean isFinished = new AtomicBoolean(false);
38+
AtomicBoolean isStopped = new AtomicBoolean(false);
39+
40+
@Before
41+
public void reset() {
42+
isFinished.set(false);
43+
isStopped.set(false);
44+
}
3745

3846
private class MockIndexer extends AsyncTwoPhaseIndexer<Integer, MockJobStats> {
3947

4048
private final CountDownLatch latch;
4149
// test the execution order
4250
private volatile int step;
51+
private final boolean stoppedBeforeFinished;
4352

4453
protected MockIndexer(Executor executor, AtomicReference<IndexerState> initialState, Integer initialPosition,
45-
CountDownLatch latch) {
54+
CountDownLatch latch, boolean stoppedBeforeFinished) {
4655
super(executor, initialState, initialPosition, new MockJobStats());
4756
this.latch = latch;
57+
this.stoppedBeforeFinished = stoppedBeforeFinished;
4858
}
4959

5060
@Override
@@ -57,7 +67,7 @@ protected IterationResult<Integer> doProcess(SearchResponse searchResponse) {
5767
awaitForLatch();
5868
assertThat(step, equalTo(3));
5969
++step;
60-
return new IterationResult<Integer>(Collections.emptyList(), 3, true);
70+
return new IterationResult<>(Collections.emptyList(), 3, true);
6171
}
6272

6373
private void awaitForLatch() {
@@ -99,7 +109,8 @@ protected void doNextBulk(BulkRequest request, ActionListener<BulkResponse> next
99109

100110
@Override
101111
protected void doSaveState(IndexerState state, Integer position, Runnable next) {
102-
assertThat(step, equalTo(5));
112+
int expectedStep = stoppedBeforeFinished ? 3 : 5;
113+
assertThat(step, equalTo(expectedStep));
103114
++step;
104115
next.run();
105116
}
@@ -114,7 +125,12 @@ protected void onFinish(ActionListener<Void> listener) {
114125
assertThat(step, equalTo(4));
115126
++step;
116127
listener.onResponse(null);
117-
isFinished.set(true);
128+
assertTrue(isFinished.compareAndSet(false, true));
129+
}
130+
131+
@Override
132+
protected void onStop() {
133+
assertTrue(isStopped.compareAndSet(false, true));
118134
}
119135

120136
@Override
@@ -180,7 +196,7 @@ protected void doSaveState(IndexerState state, Integer position, Runnable next)
180196
protected void onFailure(Exception exc) {
181197
assertThat(step, equalTo(2));
182198
++step;
183-
isFinished.set(true);
199+
assertTrue(isFinished.compareAndSet(false, true));
184200
}
185201

186202
@Override
@@ -209,18 +225,18 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
209225
public void testStateMachine() throws Exception {
210226
AtomicReference<IndexerState> state = new AtomicReference<>(IndexerState.STOPPED);
211227
final ExecutorService executor = Executors.newFixedThreadPool(1);
212-
isFinished.set(false);
213228
try {
214229
CountDownLatch countDownLatch = new CountDownLatch(1);
215-
MockIndexer indexer = new MockIndexer(executor, state, 2, countDownLatch);
230+
MockIndexer indexer = new MockIndexer(executor, state, 2, countDownLatch, false);
216231
indexer.start();
217232
assertThat(indexer.getState(), equalTo(IndexerState.STARTED));
218233
assertTrue(indexer.maybeTriggerAsyncJob(System.currentTimeMillis()));
219234
assertThat(indexer.getState(), equalTo(IndexerState.INDEXING));
220235
countDownLatch.countDown();
221236

222237
assertThat(indexer.getPosition(), equalTo(2));
223-
ESTestCase.awaitBusy(() -> isFinished.get());
238+
assertTrue(awaitBusy(() -> isFinished.get()));
239+
assertFalse(isStopped.get());
224240
assertThat(indexer.getStep(), equalTo(6));
225241
assertThat(indexer.getStats().getNumInvocations(), equalTo(1L));
226242
assertThat(indexer.getStats().getNumPages(), equalTo(1L));
@@ -234,18 +250,57 @@ public void testStateMachine() throws Exception {
234250
public void testStateMachineBrokenSearch() throws InterruptedException {
235251
AtomicReference<IndexerState> state = new AtomicReference<>(IndexerState.STOPPED);
236252
final ExecutorService executor = Executors.newFixedThreadPool(1);
237-
isFinished.set(false);
238253
try {
239254

240255
MockIndexerThrowsFirstSearch indexer = new MockIndexerThrowsFirstSearch(executor, state, 2);
241256
indexer.start();
242257
assertThat(indexer.getState(), equalTo(IndexerState.STARTED));
243258
assertTrue(indexer.maybeTriggerAsyncJob(System.currentTimeMillis()));
244-
assertTrue(ESTestCase.awaitBusy(() -> isFinished.get(), 10000, TimeUnit.SECONDS));
259+
assertTrue(awaitBusy(() -> isFinished.get(), 10000, TimeUnit.SECONDS));
245260
assertThat(indexer.getStep(), equalTo(3));
246261

247262
} finally {
248263
executor.shutdownNow();
249264
}
250265
}
266+
267+
public void testStop_AfterIndexerIsFinished() throws InterruptedException {
268+
AtomicReference<IndexerState> state = new AtomicReference<>(IndexerState.STOPPED);
269+
final ExecutorService executor = Executors.newFixedThreadPool(1);
270+
try {
271+
CountDownLatch countDownLatch = new CountDownLatch(1);
272+
MockIndexer indexer = new MockIndexer(executor, state, 2, countDownLatch, false);
273+
indexer.start();
274+
assertTrue(indexer.maybeTriggerAsyncJob(System.currentTimeMillis()));
275+
countDownLatch.countDown();
276+
assertTrue(awaitBusy(() -> isFinished.get()));
277+
278+
indexer.stop();
279+
assertTrue(isStopped.get());
280+
assertThat(indexer.getState(), equalTo(IndexerState.STOPPED));
281+
} finally {
282+
executor.shutdownNow();
283+
}
284+
}
285+
286+
public void testStop_WhileIndexing() throws InterruptedException {
287+
AtomicReference<IndexerState> state = new AtomicReference<>(IndexerState.STOPPED);
288+
final ExecutorService executor = Executors.newFixedThreadPool(1);
289+
try {
290+
CountDownLatch countDownLatch = new CountDownLatch(1);
291+
MockIndexer indexer = new MockIndexer(executor, state, 2, countDownLatch, true);
292+
indexer.start();
293+
assertThat(indexer.getState(), equalTo(IndexerState.STARTED));
294+
assertTrue(indexer.maybeTriggerAsyncJob(System.currentTimeMillis()));
295+
assertThat(indexer.getState(), equalTo(IndexerState.INDEXING));
296+
indexer.stop();
297+
countDownLatch.countDown();
298+
299+
assertThat(indexer.getPosition(), equalTo(2));
300+
assertTrue(awaitBusy(() -> isStopped.get()));
301+
assertFalse(isFinished.get());
302+
} finally {
303+
executor.shutdownNow();
304+
}
305+
}
251306
}

0 commit comments

Comments
 (0)