Skip to content

Watcher: Make start/stop cycle more predictable and synchronous #30118

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 8 commits into from
May 3, 2018
5 changes: 5 additions & 0 deletions docs/CHANGELOG.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,11 @@ option. ({pull}30140[#29658])
Added new "Request" object flavored request methods. Prefer these instead of the
multi-argument versions. ({pull}29623[#29623])

The cluster state listener to decide if watcher should be
stopped/started/paused now runs far less code in an executor but is more
synchronous and predictable. Also the trigger engine thread is only started on
data nodes. And the Execute Watch API can be triggered regardless is watcher is
started or stopped. ({pull}30118[#30118])

[float]
=== Bug Fixes
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import com.carrotsearch.randomizedtesting.annotations.ParametersFactory;
import org.apache.http.HttpStatus;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.client.Response;
import org.elasticsearch.common.CheckedFunction;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.ThreadContext;
Expand All @@ -17,6 +18,7 @@
import org.elasticsearch.test.rest.yaml.ClientYamlTestCandidate;
import org.elasticsearch.test.rest.yaml.ClientYamlTestResponse;
import org.elasticsearch.test.rest.yaml.ESClientYamlSuiteTestCase;
import org.elasticsearch.test.rest.yaml.ObjectPath;
import org.elasticsearch.xpack.core.ml.MlMetaIndex;
import org.elasticsearch.xpack.core.ml.integration.MlRestTestStateCleaner;
import org.elasticsearch.xpack.core.ml.job.persistence.AnomalyDetectorsIndex;
Expand Down Expand Up @@ -83,7 +85,6 @@ private void waitForTemplates() throws Exception {
templates.addAll(Arrays.asList(AuditorField.NOTIFICATIONS_INDEX, MlMetaIndex.INDEX_NAME,
AnomalyDetectorsIndex.jobStateIndexName(),
AnomalyDetectorsIndex.jobResultsIndexPrefix()));
templates.addAll(Arrays.asList(WatcherIndexTemplateRegistryField.TEMPLATE_NAMES));

for (String template : templates) {
awaitCallApi("indices.exists_template", singletonMap("name", template), emptyList(),
Expand All @@ -97,19 +98,49 @@ private void waitForWatcher() throws Exception {
// ensure watcher is started, so that a test can stop watcher and everything still works fine
if (isWatcherTest()) {
assertBusy(() -> {
try {
ClientYamlTestResponse response =
getAdminExecutionContext().callApi("xpack.watcher.stats", emptyMap(), emptyList(), emptyMap());
String state = (String) response.evaluate("stats.0.watcher_state");
if ("started".equals(state) == false) {
getAdminExecutionContext().callApi("xpack.watcher.start", emptyMap(), emptyList(), emptyMap());
}
// assertion required to exit the assertBusy lambda
assertThat(state, is("started"));
} catch (IOException e) {
throw new AssertionError(e);
ClientYamlTestResponse response =
getAdminExecutionContext().callApi("xpack.watcher.stats", emptyMap(), emptyList(), emptyMap());
String state = (String) response.evaluate("stats.0.watcher_state");

switch (state) {
case "stopped":
ClientYamlTestResponse startResponse =
getAdminExecutionContext().callApi("xpack.watcher.start", emptyMap(), emptyList(), emptyMap());
boolean isAcknowledged = (boolean) startResponse.evaluate("acknowledged");
assertThat(isAcknowledged, is(true));
break;
case "stopping":
throw new AssertionError("waiting until stopping state reached stopped state to start again");
case "starting":
throw new AssertionError("waiting until starting state reached started state");
case "started":
// all good here, we are done
break;
default:
throw new AssertionError("unknown state[" + state + "]");
}
});

for (String template : WatcherIndexTemplateRegistryField.TEMPLATE_NAMES) {
awaitCallApi("indices.exists_template", singletonMap("name", template), emptyList(),
response -> true,
() -> "Exception when waiting for [" + template + "] template to be created");
}

boolean existsWatcherIndex = adminClient().performRequest("HEAD", ".watches").getStatusLine().getStatusCode() == 200;
if (existsWatcherIndex == false) {
return;
}
Response response = adminClient().performRequest("GET", ".watches/_search", Collections.singletonMap("size", "1000"));
ObjectPath objectPathResponse = ObjectPath.createFromResponse(response);
int totalHits = objectPathResponse.evaluate("hits.total");
if (totalHits > 0) {
List<Map<String, Object>> hits = objectPathResponse.evaluate("hits.hits");
for (Map<String, Object> hit : hits) {
String id = (String) hit.get("_id");
assertOK(adminClient().performRequest("DELETE", "_xpack/watcher/watch/" + id));
}
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -351,7 +351,7 @@ public Collection<Object> createComponents(Client client, ClusterService cluster
final WatchParser watchParser = new WatchParser(settings, triggerService, registry, inputRegistry, cryptoService, getClock());

final ExecutionService executionService = new ExecutionService(settings, historyStore, triggeredWatchStore, watchExecutor,
getClock(), watchParser, clusterService, client);
getClock(), watchParser, clusterService, client, threadPool.generic());

final Consumer<Iterable<TriggerEvent>> triggerEngineListener = getTriggerEngineListener(executionService);
triggerService.register(triggerEngineListener);
Expand All @@ -360,7 +360,7 @@ public Collection<Object> createComponents(Client client, ClusterService cluster
watchParser, client);

final WatcherLifeCycleService watcherLifeCycleService =
new WatcherLifeCycleService(settings, threadPool, clusterService, watcherService);
new WatcherLifeCycleService(settings, clusterService, watcherService);

listener = new WatcherIndexingListener(settings, watchParser, getClock(), triggerService);
clusterService.addListener(listener);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -123,10 +123,10 @@ public Engine.Index preIndex(ShardId shardId, Engine.Index operation) {
boolean shouldBeTriggered = shardAllocationConfiguration.shouldBeTriggered(watch.id());
if (shouldBeTriggered) {
if (watch.status().state().isActive()) {
logger.debug("adding watch [{}] to trigger", watch.id());
logger.debug("adding watch [{}] to trigger service", watch.id());
triggerService.add(watch);
} else {
logger.debug("removing watch [{}] to trigger", watch.id());
logger.debug("removing watch [{}] to trigger service", watch.id());
triggerService.remove(watch.id());
}
} else {
Expand Down
Loading