Skip to content

Commit 2c38d12

Browse files
authored
Watcher: Make start/stop cycle more predictable and synchronous (#30118)
The current implementation starts/stops watcher using an executor. This can result in our of order operations. This commit reduces those executor calls to an absolute minimum in order to be able to do state changes within the cluster state listener method, which runs in sequence. When a state change occurs that forces the watcher service to pause (like no watcher index, no master node, no local shards), the service is now in a paused state. Pausing is a super lightweight operation, which marks the ExecutionService as paused and waits for the currently executing watches to finish in the background via an executor. The same applies for stopping, the potentially long running operation is outsourced in to an executor, as waiting for executed watches is decoupled from the current state. The only other long running operation is starting, where watches need to be loaded. This is also done via an executor, but has an additional protection by checking the cluster state version it was started with. If another cluster state version was trying to load the watches, then this loading will not take effect. This PR also cleans up some unused states, like the a simple boolean in the HistoryStore/TriggeredWatchStore marking it as started or stopped, as this can now be caught in the execution service. Another advantage of this approach is the fact, that now only triggered watches are not getting executed, while watches that are run via the Execute Watch API will still be executed regardless if watcher is stopped or not. Lastly the TickerScheduleTriggerEngine thread now only starts on data nodes.
1 parent 824e648 commit 2c38d12

File tree

20 files changed

+791
-1019
lines changed

20 files changed

+791
-1019
lines changed

docs/CHANGELOG.asciidoc

+5
Original file line numberDiff line numberDiff line change
@@ -96,6 +96,11 @@ option. ({pull}30140[#29658])
9696
Added new "Request" object flavored request methods. Prefer these instead of the
9797
multi-argument versions. ({pull}29623[#29623])
9898

99+
The cluster state listener to decide if watcher should be
100+
stopped/started/paused now runs far less code in an executor but is more
101+
synchronous and predictable. Also the trigger engine thread is only started on
102+
data nodes. And the Execute Watch API can be triggered regardless is watcher is
103+
started or stopped. ({pull}30118[#30118])
99104

100105
[float]
101106
=== Bug Fixes

x-pack/plugin/src/test/java/org/elasticsearch/xpack/test/rest/XPackRestIT.java

+43-12
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
import com.carrotsearch.randomizedtesting.annotations.ParametersFactory;
99
import org.apache.http.HttpStatus;
1010
import org.elasticsearch.ElasticsearchException;
11+
import org.elasticsearch.client.Response;
1112
import org.elasticsearch.common.CheckedFunction;
1213
import org.elasticsearch.common.settings.Settings;
1314
import org.elasticsearch.common.util.concurrent.ThreadContext;
@@ -17,6 +18,7 @@
1718
import org.elasticsearch.test.rest.yaml.ClientYamlTestCandidate;
1819
import org.elasticsearch.test.rest.yaml.ClientYamlTestResponse;
1920
import org.elasticsearch.test.rest.yaml.ESClientYamlSuiteTestCase;
21+
import org.elasticsearch.test.rest.yaml.ObjectPath;
2022
import org.elasticsearch.xpack.core.ml.MlMetaIndex;
2123
import org.elasticsearch.xpack.core.ml.integration.MlRestTestStateCleaner;
2224
import org.elasticsearch.xpack.core.ml.job.persistence.AnomalyDetectorsIndex;
@@ -83,7 +85,6 @@ private void waitForTemplates() throws Exception {
8385
templates.addAll(Arrays.asList(AuditorField.NOTIFICATIONS_INDEX, MlMetaIndex.INDEX_NAME,
8486
AnomalyDetectorsIndex.jobStateIndexName(),
8587
AnomalyDetectorsIndex.jobResultsIndexPrefix()));
86-
templates.addAll(Arrays.asList(WatcherIndexTemplateRegistryField.TEMPLATE_NAMES));
8788

8889
for (String template : templates) {
8990
awaitCallApi("indices.exists_template", singletonMap("name", template), emptyList(),
@@ -97,19 +98,49 @@ private void waitForWatcher() throws Exception {
9798
// ensure watcher is started, so that a test can stop watcher and everything still works fine
9899
if (isWatcherTest()) {
99100
assertBusy(() -> {
100-
try {
101-
ClientYamlTestResponse response =
102-
getAdminExecutionContext().callApi("xpack.watcher.stats", emptyMap(), emptyList(), emptyMap());
103-
String state = (String) response.evaluate("stats.0.watcher_state");
104-
if ("started".equals(state) == false) {
105-
getAdminExecutionContext().callApi("xpack.watcher.start", emptyMap(), emptyList(), emptyMap());
106-
}
107-
// assertion required to exit the assertBusy lambda
108-
assertThat(state, is("started"));
109-
} catch (IOException e) {
110-
throw new AssertionError(e);
101+
ClientYamlTestResponse response =
102+
getAdminExecutionContext().callApi("xpack.watcher.stats", emptyMap(), emptyList(), emptyMap());
103+
String state = (String) response.evaluate("stats.0.watcher_state");
104+
105+
switch (state) {
106+
case "stopped":
107+
ClientYamlTestResponse startResponse =
108+
getAdminExecutionContext().callApi("xpack.watcher.start", emptyMap(), emptyList(), emptyMap());
109+
boolean isAcknowledged = (boolean) startResponse.evaluate("acknowledged");
110+
assertThat(isAcknowledged, is(true));
111+
break;
112+
case "stopping":
113+
throw new AssertionError("waiting until stopping state reached stopped state to start again");
114+
case "starting":
115+
throw new AssertionError("waiting until starting state reached started state");
116+
case "started":
117+
// all good here, we are done
118+
break;
119+
default:
120+
throw new AssertionError("unknown state[" + state + "]");
111121
}
112122
});
123+
124+
for (String template : WatcherIndexTemplateRegistryField.TEMPLATE_NAMES) {
125+
awaitCallApi("indices.exists_template", singletonMap("name", template), emptyList(),
126+
response -> true,
127+
() -> "Exception when waiting for [" + template + "] template to be created");
128+
}
129+
130+
boolean existsWatcherIndex = adminClient().performRequest("HEAD", ".watches").getStatusLine().getStatusCode() == 200;
131+
if (existsWatcherIndex == false) {
132+
return;
133+
}
134+
Response response = adminClient().performRequest("GET", ".watches/_search", Collections.singletonMap("size", "1000"));
135+
ObjectPath objectPathResponse = ObjectPath.createFromResponse(response);
136+
int totalHits = objectPathResponse.evaluate("hits.total");
137+
if (totalHits > 0) {
138+
List<Map<String, Object>> hits = objectPathResponse.evaluate("hits.hits");
139+
for (Map<String, Object> hit : hits) {
140+
String id = (String) hit.get("_id");
141+
assertOK(adminClient().performRequest("DELETE", "_xpack/watcher/watch/" + id));
142+
}
143+
}
113144
}
114145
}
115146

x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/Watcher.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -351,7 +351,7 @@ public Collection<Object> createComponents(Client client, ClusterService cluster
351351
final WatchParser watchParser = new WatchParser(settings, triggerService, registry, inputRegistry, cryptoService, getClock());
352352

353353
final ExecutionService executionService = new ExecutionService(settings, historyStore, triggeredWatchStore, watchExecutor,
354-
getClock(), watchParser, clusterService, client);
354+
getClock(), watchParser, clusterService, client, threadPool.generic());
355355

356356
final Consumer<Iterable<TriggerEvent>> triggerEngineListener = getTriggerEngineListener(executionService);
357357
triggerService.register(triggerEngineListener);
@@ -360,7 +360,7 @@ public Collection<Object> createComponents(Client client, ClusterService cluster
360360
watchParser, client);
361361

362362
final WatcherLifeCycleService watcherLifeCycleService =
363-
new WatcherLifeCycleService(settings, threadPool, clusterService, watcherService);
363+
new WatcherLifeCycleService(settings, clusterService, watcherService);
364364

365365
listener = new WatcherIndexingListener(settings, watchParser, getClock(), triggerService);
366366
clusterService.addListener(listener);

x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/WatcherIndexingListener.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -123,10 +123,10 @@ public Engine.Index preIndex(ShardId shardId, Engine.Index operation) {
123123
boolean shouldBeTriggered = shardAllocationConfiguration.shouldBeTriggered(watch.id());
124124
if (shouldBeTriggered) {
125125
if (watch.status().state().isActive()) {
126-
logger.debug("adding watch [{}] to trigger", watch.id());
126+
logger.debug("adding watch [{}] to trigger service", watch.id());
127127
triggerService.add(watch);
128128
} else {
129-
logger.debug("removing watch [{}] to trigger", watch.id());
129+
logger.debug("removing watch [{}] to trigger service", watch.id());
130130
triggerService.remove(watch.id());
131131
}
132132
} else {

0 commit comments

Comments
 (0)