Skip to content

[ML Data Frame] Refactor stop logic (#42644) #42763

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Jun 3, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -90,28 +90,21 @@ public synchronized IndexerState start() {
* Sets the internal state to {@link IndexerState#STOPPING} if an async job is
* running in the background, {@link #onStop()} will be called when the background job
* detects that the indexer is stopped.
* If there is no job running when this function is called
* the state is set to {@link IndexerState#STOPPED} and {@link #onStop()} called directly.
* If there is no job running when this function is called the returned
* state is {@link IndexerState#STOPPED} and {@link #onStop()} will not be called.
*
* @return The new state for the indexer (STOPPED, STOPPING or ABORTING if the job was already aborted).
*/
public synchronized IndexerState stop() {
AtomicBoolean wasStartedAndSetStopped = new AtomicBoolean(false);
IndexerState currentState = state.updateAndGet(previousState -> {
return state.updateAndGet(previousState -> {
if (previousState == IndexerState.INDEXING) {
return IndexerState.STOPPING;
} else if (previousState == IndexerState.STARTED) {
wasStartedAndSetStopped.set(true);
return IndexerState.STOPPED;
} else {
return previousState;
}
});

if (wasStartedAndSetStopped.get()) {
onStop();
}
return currentState;
}

/**
Expand Down Expand Up @@ -288,20 +281,22 @@ private void finishWithIndexingFailure(Exception exc) {
}

private IndexerState finishAndSetState() {
return state.updateAndGet(prev -> {
AtomicBoolean callOnStop = new AtomicBoolean(false);
AtomicBoolean callOnAbort = new AtomicBoolean(false);
IndexerState updatedState = state.updateAndGet(prev -> {
switch (prev) {
case INDEXING:
// ready for another job
return IndexerState.STARTED;

case STOPPING:
callOnStop.set(true);
// must be started again
onStop();
return IndexerState.STOPPED;

case ABORTING:
callOnAbort.set(true);
// abort and exit
onAbort();
return IndexerState.ABORTING; // This shouldn't matter, since onAbort() will kill the task first

case STOPPED:
Expand All @@ -316,6 +311,14 @@ private IndexerState finishAndSetState() {
throw new IllegalStateException("Indexer job encountered an illegal state [" + prev + "]");
}
});

if (callOnStop.get()) {
onStop();
} else if (callOnAbort.get()) {
onAbort();
}

return updatedState;
}

private void onSearchResponse(SearchResponse searchResponse) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -268,25 +268,6 @@ public void testStateMachineBrokenSearch() throws InterruptedException {
}
}

public void testStop_AfterIndexerIsFinished() throws InterruptedException {
AtomicReference<IndexerState> state = new AtomicReference<>(IndexerState.STOPPED);
final ExecutorService executor = Executors.newFixedThreadPool(1);
try {
CountDownLatch countDownLatch = new CountDownLatch(1);
MockIndexer indexer = new MockIndexer(executor, state, 2, countDownLatch, false);
indexer.start();
assertTrue(indexer.maybeTriggerAsyncJob(System.currentTimeMillis()));
countDownLatch.countDown();
assertTrue(awaitBusy(() -> isFinished.get()));

indexer.stop();
assertTrue(isStopped.get());
assertThat(indexer.getState(), equalTo(IndexerState.STOPPED));
} finally {
executor.shutdownNow();
}
}

public void testStop_WhileIndexing() throws InterruptedException {
AtomicReference<IndexerState> state = new AtomicReference<>(IndexerState.STOPPED);
final ExecutorService executor = Executors.newFixedThreadPool(1);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ public void cleanTransforms() throws IOException {
cleanUp();
}

@AwaitsFix( bugUrl = "https://github.com/elastic/elasticsearch/issues/42344")
public void testDataFrameTransformCrud() throws Exception {
createReviewsIndex();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@

package org.elasticsearch.xpack.dataframe.integration;

import org.apache.lucene.util.LuceneTestCase;
import org.elasticsearch.client.Request;
import org.elasticsearch.xpack.dataframe.persistence.DataFrameInternalIndex;
import org.junit.Before;
Expand All @@ -23,7 +22,6 @@
import static org.hamcrest.Matchers.empty;
import static org.hamcrest.Matchers.is;

@LuceneTestCase.AwaitsFix( bugUrl = "https://github.com/elastic/elasticsearch/issues/42344")
public class DataFrameAuditorIT extends DataFrameRestTestCase {

private static final String TEST_USER_NAME = "df_admin_plus_data";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@

import org.apache.http.entity.ContentType;
import org.apache.http.entity.StringEntity;
import org.apache.lucene.util.LuceneTestCase;
import org.elasticsearch.client.Request;
import org.elasticsearch.client.Response;
import org.elasticsearch.client.ResponseException;
Expand All @@ -23,7 +22,6 @@

import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;

@LuceneTestCase.AwaitsFix( bugUrl = "https://github.com/elastic/elasticsearch/issues/42344")
public class DataFrameConfigurationIndexIT extends DataFrameRestTestCase {

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@

package org.elasticsearch.xpack.dataframe.integration;

import org.apache.lucene.util.LuceneTestCase;
import org.elasticsearch.client.Request;
import org.elasticsearch.common.xcontent.support.XContentMapValues;
import org.elasticsearch.xpack.core.dataframe.DataFrameField;
Expand All @@ -22,7 +21,6 @@
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThan;

@LuceneTestCase.AwaitsFix( bugUrl = "https://github.com/elastic/elasticsearch/issues/42344")
public class DataFrameGetAndGetStatsIT extends DataFrameRestTestCase {

private static final String TEST_USER_NAME = "df_user";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@

package org.elasticsearch.xpack.dataframe.integration;

import org.apache.lucene.util.LuceneTestCase;
import org.elasticsearch.Version;
import org.elasticsearch.client.Request;
import org.elasticsearch.client.Response;
Expand All @@ -16,7 +15,6 @@
import java.io.IOException;
import java.util.Map;

@LuceneTestCase.AwaitsFix( bugUrl = "https://github.com/elastic/elasticsearch/issues/42344")
public class DataFrameMetaDataIT extends DataFrameRestTestCase {

private boolean indicesCreated = false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@

package org.elasticsearch.xpack.dataframe.integration;

import org.apache.lucene.util.LuceneTestCase;
import org.elasticsearch.client.Request;
import org.elasticsearch.common.xcontent.support.XContentMapValues;
import org.junit.Before;
Expand All @@ -22,7 +21,6 @@
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.equalTo;

@LuceneTestCase.AwaitsFix( bugUrl = "https://github.com/elastic/elasticsearch/issues/42344")
public class DataFramePivotRestIT extends DataFrameRestTestCase {

private static final String TEST_USER_NAME = "df_admin_plus_data";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@

package org.elasticsearch.xpack.dataframe.integration;

import org.apache.lucene.util.LuceneTestCase;
import org.elasticsearch.client.ResponseException;
import org.elasticsearch.common.xcontent.support.XContentMapValues;
import org.elasticsearch.rest.RestStatus;
Expand All @@ -20,7 +19,6 @@
import static org.hamcrest.CoreMatchers.nullValue;
import static org.hamcrest.Matchers.equalTo;

@LuceneTestCase.AwaitsFix( bugUrl = "https://github.com/elastic/elasticsearch/issues/42344")
public class DataFrameTaskFailedStateIT extends DataFrameRestTestCase {

public void testDummy() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@

package org.elasticsearch.xpack.dataframe.integration;

import org.apache.lucene.util.LuceneTestCase;
import org.elasticsearch.client.Request;
import org.elasticsearch.client.Response;
import org.elasticsearch.common.xcontent.support.XContentMapValues;
Expand All @@ -23,7 +22,6 @@
import static org.elasticsearch.xpack.core.dataframe.DataFrameField.INDEX_DOC_TYPE;
import static org.elasticsearch.xpack.dataframe.DataFrameFeatureSet.PROVIDED_STATS;

@LuceneTestCase.AwaitsFix( bugUrl = "https://github.com/elastic/elasticsearch/issues/42344")
public class DataFrameUsageIT extends DataFrameRestTestCase {
private boolean indicesCreated = false;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ public class DataFrameTransformTask extends AllocatedPersistentTask implements S
private final Map<String, Object> initialPosition;
private final IndexerState initialIndexerState;

private final SetOnce<DataFrameIndexer> indexer = new SetOnce<>();
private final SetOnce<ClientDataFrameIndexer> indexer = new SetOnce<>();

private final AtomicReference<DataFrameTransformTaskState> taskState;
private final AtomicReference<String> stateReason;
Expand Down Expand Up @@ -125,7 +125,7 @@ public Status getStatus() {
return getState();
}

private DataFrameIndexer getIndexer() {
private ClientDataFrameIndexer getIndexer() {
return indexer.get();
}

Expand Down Expand Up @@ -236,7 +236,10 @@ public synchronized void stop() {
return;
}

getIndexer().stop();
IndexerState state = getIndexer().stop();
if (state == IndexerState.STOPPED) {
getIndexer().doSaveState(state, getIndexer().getPosition(), () -> getIndexer().onStop());
}
}

@Override
Expand Down Expand Up @@ -530,40 +533,36 @@ protected void doSaveState(IndexerState indexerState, Map<String, Object> positi
next.run();
return;
}
// If we are `STOPPED` on a `doSaveState` call, that indicates we transitioned to `STOPPED` from `STOPPING`
// OR we called `doSaveState` manually as the indexer was not actively running.
// Since we save the state to an index, we should make sure that our task state is in parity with the indexer state
if (indexerState.equals(IndexerState.STOPPED)) {
transformTask.setTaskStateStopped();
}

final DataFrameTransformState state = new DataFrameTransformState(
transformTask.taskState.get(),
indexerState,
getPosition(),
position,
transformTask.currentCheckpoint.get(),
transformTask.stateReason.get(),
getProgress());
logger.debug("Updating persistent state of transform [{}] to [{}]", transformConfig.getId(), state.toString());

// Persisting stats when we call `doSaveState` should be ok as we only call it on a state transition and
// only every-so-often when doing the bulk indexing calls. See AsyncTwoPhaseIndexer#onBulkResponse for current periodicity
ActionListener<PersistentTasksCustomMetaData.PersistentTask<?>> updateClusterStateListener = ActionListener.wrap(
task -> {
transformsConfigManager.putOrUpdateTransformStats(
new DataFrameTransformStateAndStats(transformId, state, getStats(),
DataFrameTransformCheckpointingInfo.EMPTY), // TODO should this be null
ActionListener.wrap(
r -> {
next.run();
},
statsExc -> {
logger.error("Updating stats of transform [" + transformConfig.getId() + "] failed", statsExc);
next.run();
}
));
},
exc -> {
logger.error("Updating persistent state of transform [" + transformConfig.getId() + "] failed", exc);
next.run();
}
);

transformTask.persistStateToClusterState(state, updateClusterStateListener);
transformsConfigManager.putOrUpdateTransformStats(
new DataFrameTransformStateAndStats(transformId, state, getStats(),
DataFrameTransformCheckpointingInfo.EMPTY), // TODO should this be null
ActionListener.wrap(
r -> {
next.run();
},
statsExc -> {
logger.error("Updating stats of transform [" + transformConfig.getId() + "] failed", statsExc);
next.run();
}
));
}

@Override
Expand Down Expand Up @@ -602,20 +601,7 @@ protected void onFinish(ActionListener<Void> listener) {
protected void onStop() {
auditor.info(transformConfig.getId(), "Indexer has stopped");
logger.info("Data frame transform [{}] indexer has stopped", transformConfig.getId());

transformTask.setTaskStateStopped();
transformsConfigManager.putOrUpdateTransformStats(
new DataFrameTransformStateAndStats(transformId, transformTask.getState(), getStats(),
DataFrameTransformCheckpointingInfo.EMPTY), // TODO should this be null
ActionListener.wrap(
r -> {
transformTask.shutdown();
},
statsExc -> {
transformTask.shutdown();
logger.error("Updating saving stats of transform [" + transformConfig.getId() + "] failed", statsExc);
}
));
transformTask.shutdown();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -190,8 +190,10 @@ teardown:
- do:
data_frame.stop_data_frame_transform:
transform_id: "airline-transform-start-stop"
wait_for_completion: true
- match: { acknowledged: true }


- do:
data_frame.get_data_frame_transform_stats:
transform_id: "airline-transform-start-later"
Expand All @@ -209,3 +211,46 @@ teardown:
- do:
data_frame.delete_data_frame_transform:
transform_id: "airline-transform-start-later"

---
"Test stop all":
- do:
data_frame.put_data_frame_transform:
transform_id: "airline-transform-stop-all"
body: >
{
"source": { "index": "airline-data" },
"dest": { "index": "airline-data-start-later" },
"pivot": {
"group_by": { "airline": {"terms": {"field": "airline"}}},
"aggs": {"avg_response": {"avg": {"field": "responsetime"}}}
}
}
- do:
data_frame.start_data_frame_transform:
transform_id: "airline-transform-stop-all"
- match: { acknowledged: true }

- do:
data_frame.start_data_frame_transform:
transform_id: "airline-transform-start-stop"
- match: { acknowledged: true }

- do:
data_frame.stop_data_frame_transform:
transform_id: "_all"
wait_for_completion: true
- match: { acknowledged: true }

- do:
data_frame.get_data_frame_transform_stats:
transform_id: "*"
- match: { count: 2 }
- match: { transforms.0.state.indexer_state: "stopped" }
- match: { transforms.0.state.task_state: "stopped" }
- match: { transforms.1.state.indexer_state: "stopped" }
- match: { transforms.1.state.task_state: "stopped" }

- do:
data_frame.delete_data_frame_transform:
transform_id: "airline-transform-stop-all"