Skip to content

Commit 551353d

Browse files
authored
[ML][Data Frame] only complete task after state persistence (#43230)
* [ML][Data Frame] only complete task after state persistence There is a race condition where the task could be completed, but there is still a pending document write. This change moves the task cancellation into the actionlistener of the state persistence. intermediate commit intermediate commit * removing unused import * removing unused const * refreshing internal index after waiting for task to complete * adjusting test data generation
1 parent 37e4008 commit 551353d

File tree

4 files changed

+40
-19
lines changed

4 files changed

+40
-19
lines changed

x-pack/plugin/data-frame/qa/multi-node-tests/src/test/java/org/elasticsearch/xpack/dataframe/integration/DataFrameIntegTestCase.java

+7-10
Original file line numberDiff line numberDiff line change
@@ -65,8 +65,6 @@
6565

6666
abstract class DataFrameIntegTestCase extends ESRestTestCase {
6767

68-
protected static final String REVIEWS_INDEX_NAME = "data_frame_reviews";
69-
7068
private Map<String, DataFrameTransformConfig> transformConfigs = new HashMap<>();
7169

7270
protected void cleanUp() throws IOException {
@@ -213,8 +211,7 @@ protected DataFrameTransformConfig createTransformConfig(String id,
213211
.build();
214212
}
215213

216-
protected void createReviewsIndex() throws Exception {
217-
final int numDocs = 1000;
214+
protected void createReviewsIndex(String indexName, int numDocs) throws Exception {
218215
RestHighLevelClient restClient = new TestRestHighLevelClient();
219216

220217
// create mapping
@@ -241,12 +238,12 @@ protected void createReviewsIndex() throws Exception {
241238
}
242239
builder.endObject();
243240
CreateIndexResponse response =
244-
restClient.indices().create(new CreateIndexRequest(REVIEWS_INDEX_NAME).mapping(builder), RequestOptions.DEFAULT);
241+
restClient.indices().create(new CreateIndexRequest(indexName).mapping(builder), RequestOptions.DEFAULT);
245242
assertThat(response.isAcknowledged(), is(true));
246243
}
247244

248245
// create index
249-
BulkRequest bulk = new BulkRequest(REVIEWS_INDEX_NAME);
246+
BulkRequest bulk = new BulkRequest(indexName);
250247
int day = 10;
251248
for (int i = 0; i < numDocs; i++) {
252249
long user = i % 28;
@@ -256,7 +253,7 @@ protected void createReviewsIndex() throws Exception {
256253
int min = 10 + (i % 49);
257254
int sec = 10 + (i % 49);
258255

259-
String date_string = "2017-01-" + day + "T" + hour + ":" + min + ":" + sec + "Z";
256+
String date_string = "2017-01-" + (day < 10 ? "0" + day : day) + "T" + hour + ":" + min + ":" + sec + "Z";
260257

261258
StringBuilder sourceBuilder = new StringBuilder();
262259
sourceBuilder.append("{\"user_id\":\"")
@@ -277,13 +274,13 @@ protected void createReviewsIndex() throws Exception {
277274
if (i % 50 == 0) {
278275
BulkResponse response = restClient.bulk(bulk, RequestOptions.DEFAULT);
279276
assertThat(response.buildFailureMessage(), response.hasFailures(), is(false));
280-
bulk = new BulkRequest(REVIEWS_INDEX_NAME);
281-
day += 1;
277+
bulk = new BulkRequest(indexName);
278+
day = (day + 1) % 28;
282279
}
283280
}
284281
BulkResponse response = restClient.bulk(bulk, RequestOptions.DEFAULT);
285282
assertThat(response.buildFailureMessage(), response.hasFailures(), is(false));
286-
restClient.indices().refresh(new RefreshRequest(REVIEWS_INDEX_NAME), RequestOptions.DEFAULT);
283+
restClient.indices().refresh(new RefreshRequest(indexName), RequestOptions.DEFAULT);
287284
}
288285

289286
protected Map<String, Object> toLazy(ToXContent parsedObject) throws Exception {

x-pack/plugin/data-frame/qa/multi-node-tests/src/test/java/org/elasticsearch/xpack/dataframe/integration/DataFrameTransformIT.java

+5-3
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,8 @@ public void cleanTransforms() throws IOException {
3030
}
3131

3232
public void testDataFrameTransformCrud() throws Exception {
33-
createReviewsIndex();
33+
String indexName = "basic-crud-reviews";
34+
createReviewsIndex(indexName, 100);
3435

3536
Map<String, SingleGroupSource> groups = new HashMap<>();
3637
groups.put("by-day", createDateHistogramGroupSourceWithCalendarInterval("timestamp", DateHistogramInterval.DAY, null, null));
@@ -45,7 +46,7 @@ public void testDataFrameTransformCrud() throws Exception {
4546
groups,
4647
aggs,
4748
"reviews-by-user-business-day",
48-
REVIEWS_INDEX_NAME);
49+
indexName);
4950

5051
assertTrue(putDataFrameTransform(config, RequestOptions.DEFAULT).isAcknowledged());
5152
assertTrue(startDataFrameTransform(config.getId(), RequestOptions.DEFAULT).isAcknowledged());
@@ -56,7 +57,8 @@ public void testDataFrameTransformCrud() throws Exception {
5657
assertBusy(() ->
5758
assertThat(getDataFrameTransformStats(config.getId()).getTransformsStateAndStats().get(0).getTransformState().getIndexerState(),
5859
equalTo(IndexerState.STOPPED)));
60+
stopDataFrameTransform(config.getId());
61+
deleteDataFrameTransform(config.getId());
5962
}
6063

61-
6264
}

x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/action/TransportStopDataFrameTransformAction.java

+25-2
Original file line numberDiff line numberDiff line change
@@ -5,13 +5,16 @@
55
*/
66
package org.elasticsearch.xpack.dataframe.action;
77

8+
import org.apache.logging.log4j.LogManager;
9+
import org.apache.logging.log4j.Logger;
810
import org.elasticsearch.ElasticsearchStatusException;
911
import org.elasticsearch.action.ActionListener;
1012
import org.elasticsearch.action.ActionListenerResponseHandler;
1113
import org.elasticsearch.action.FailedNodeException;
1214
import org.elasticsearch.action.TaskOperationFailure;
1315
import org.elasticsearch.action.support.ActionFilters;
1416
import org.elasticsearch.action.support.tasks.TransportTasksAction;
17+
import org.elasticsearch.client.Client;
1518
import org.elasticsearch.cluster.ClusterState;
1619
import org.elasticsearch.cluster.node.DiscoveryNodes;
1720
import org.elasticsearch.cluster.service.ClusterService;
@@ -26,6 +29,7 @@
2629
import org.elasticsearch.xpack.core.action.util.PageParams;
2730
import org.elasticsearch.xpack.core.dataframe.action.StopDataFrameTransformAction;
2831
import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformTaskState;
32+
import org.elasticsearch.xpack.dataframe.persistence.DataFrameInternalIndex;
2933
import org.elasticsearch.xpack.dataframe.persistence.DataFrameTransformsConfigManager;
3034
import org.elasticsearch.xpack.dataframe.transforms.DataFrameTransformTask;
3135

@@ -38,20 +42,25 @@ public class TransportStopDataFrameTransformAction extends
3842
TransportTasksAction<DataFrameTransformTask, StopDataFrameTransformAction.Request,
3943
StopDataFrameTransformAction.Response, StopDataFrameTransformAction.Response> {
4044

45+
private static final Logger logger = LogManager.getLogger(TransportStopDataFrameTransformAction.class);
46+
4147
private final ThreadPool threadPool;
4248
private final DataFrameTransformsConfigManager dataFrameTransformsConfigManager;
4349
private final PersistentTasksService persistentTasksService;
50+
private final Client client;
4451

4552
@Inject
4653
public TransportStopDataFrameTransformAction(TransportService transportService, ActionFilters actionFilters,
4754
ClusterService clusterService, ThreadPool threadPool,
4855
PersistentTasksService persistentTasksService,
49-
DataFrameTransformsConfigManager dataFrameTransformsConfigManager) {
56+
DataFrameTransformsConfigManager dataFrameTransformsConfigManager,
57+
Client client) {
5058
super(StopDataFrameTransformAction.NAME, clusterService, transportService, actionFilters, StopDataFrameTransformAction.Request::new,
5159
StopDataFrameTransformAction.Response::new, StopDataFrameTransformAction.Response::new, ThreadPool.Names.SAME);
5260
this.threadPool = threadPool;
5361
this.dataFrameTransformsConfigManager = dataFrameTransformsConfigManager;
5462
this.persistentTasksService = persistentTasksService;
63+
this.client = client;
5564
}
5665

5766
@Override
@@ -132,12 +141,26 @@ protected StopDataFrameTransformAction.Response newResponse(StopDataFrameTransfo
132141
waitForStopListener(StopDataFrameTransformAction.Request request,
133142
ActionListener<StopDataFrameTransformAction.Response> listener) {
134143

144+
ActionListener<StopDataFrameTransformAction.Response> onStopListener = ActionListener.wrap(
145+
waitResponse ->
146+
client.admin()
147+
.indices()
148+
.prepareRefresh(DataFrameInternalIndex.INDEX_NAME)
149+
.execute(ActionListener.wrap(
150+
r -> listener.onResponse(waitResponse),
151+
e -> {
152+
logger.info("Failed to refresh internal index after delete", e);
153+
listener.onResponse(waitResponse);
154+
})
155+
),
156+
listener::onFailure
157+
);
135158
return ActionListener.wrap(
136159
response -> {
137160
// Wait until the persistent task is stopped
138161
// Switch over to Generic threadpool so we don't block the network thread
139162
threadPool.generic().execute(() ->
140-
waitForDataFrameStopped(request.getExpandedIds(), request.getTimeout(), listener));
163+
waitForDataFrameStopped(request.getExpandedIds(), request.getTimeout(), onStopListener));
141164
},
142165
listener::onFailure
143166
);

x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/transforms/DataFrameTransformTask.java

+3-4
Original file line numberDiff line numberDiff line change
@@ -248,7 +248,7 @@ public synchronized void stop() {
248248

249249
IndexerState state = getIndexer().stop();
250250
if (state == IndexerState.STOPPED) {
251-
//doSaveState calls `onStop` when the task state is `STOPPED`
251+
getIndexer().onStop();
252252
getIndexer().doSaveState(state, getIndexer().getPosition(), () -> {});
253253
}
254254
}
@@ -610,7 +610,7 @@ protected void doSaveState(IndexerState indexerState, Map<String, Object> positi
610610
r -> {
611611
// for auto stop shutdown the task
612612
if (state.getTaskState().equals(DataFrameTransformTaskState.STOPPED)) {
613-
onStop();
613+
transformTask.shutdown();
614614
}
615615
next.run();
616616
},
@@ -620,7 +620,7 @@ protected void doSaveState(IndexerState indexerState, Map<String, Object> positi
620620
"Failure updating stats of transform: " + statsExc.getMessage());
621621
// for auto stop shutdown the task
622622
if (state.getTaskState().equals(DataFrameTransformTaskState.STOPPED)) {
623-
onStop();
623+
transformTask.shutdown();
624624
}
625625
next.run();
626626
}
@@ -666,7 +666,6 @@ protected void onFinish(ActionListener<Void> listener) {
666666
protected void onStop() {
667667
auditor.info(transformConfig.getId(), "Data frame transform has stopped.");
668668
logger.info("Data frame transform [{}] indexer has stopped", transformConfig.getId());
669-
transformTask.shutdown();
670669
}
671670

672671
@Override

0 commit comments

Comments
 (0)