Skip to content

Commit 814cb73

Browse files
committed
Watcher: Make start/stop cycle more predictable and synchronous (#30118)
The current implementation starts/stops watcher using an executor. This can result in our of order operations. This commit reduces those executor calls to an absolute minimum in order to be able to do state changes within the cluster state listener method, which runs in sequence. When a state change occurs that forces the watcher service to pause (like no watcher index, no master node, no local shards), the service is now in a paused state. Pausing is a super lightweight operation, which marks the ExecutionService as paused and waits for the currently executing watches to finish in the background via an executor. The same applies for stopping, the potentially long running operation is outsourced in to an executor, as waiting for executed watches is decoupled from the current state. The only other long running operation is starting, where watches need to be loaded. This is also done via an executor, but has an additional protection by checking the cluster state version it was started with. If another cluster state version was trying to load the watches, then this loading will not take effect. This PR also cleans up some unused states, like the a simple boolean in the HistoryStore/TriggeredWatchStore marking it as started or stopped, as this can now be caught in the execution service. Another advantage of this approach is the fact, that now only triggered watches are not getting executed, while watches that are run via the Execute Watch API will still be executed regardless if watcher is stopped or not. Lastly the TickerScheduleTriggerEngine thread now only starts on data nodes.
1 parent af770a2 commit 814cb73

File tree

21 files changed

+836
-1063
lines changed

21 files changed

+836
-1063
lines changed

docs/CHANGELOG.asciidoc

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -379,6 +379,11 @@ more information, see <<notification-settings>>.
379379

380380
//[float]
381381
//=== Enhancements
382+
The cluster state listener to decide if watcher should be
383+
stopped/started/paused now runs far less code in an executor but is more
384+
synchronous and predictable. Also the trigger engine thread is only started on
385+
data nodes. And the Execute Watch API can be triggered regardless is watcher is
386+
started or stopped. ({pull}30118[#30118])
382387

383388
[float]
384389
=== Bug Fixes

x-pack/plugin/src/test/java/org/elasticsearch/xpack/test/rest/XPackRestIT.java

Lines changed: 41 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,9 @@
88
import com.carrotsearch.randomizedtesting.annotations.ParametersFactory;
99
import org.apache.http.HttpStatus;
1010
import org.elasticsearch.ElasticsearchException;
11+
import org.elasticsearch.client.Response;
1112
import org.elasticsearch.common.CheckedFunction;
13+
import org.elasticsearch.common.collect.MapBuilder;
1214
import org.elasticsearch.common.settings.Settings;
1315
import org.elasticsearch.common.util.concurrent.ThreadContext;
1416
import org.elasticsearch.common.xcontent.support.XContentMapValues;
@@ -17,6 +19,7 @@
1719
import org.elasticsearch.test.rest.yaml.ClientYamlTestCandidate;
1820
import org.elasticsearch.test.rest.yaml.ClientYamlTestResponse;
1921
import org.elasticsearch.test.rest.yaml.ESClientYamlSuiteTestCase;
22+
import org.elasticsearch.test.rest.yaml.ObjectPath;
2023
import org.elasticsearch.xpack.core.ml.MlMetaIndex;
2124
import org.elasticsearch.xpack.core.ml.integration.MlRestTestStateCleaner;
2225
import org.elasticsearch.xpack.core.ml.job.persistence.AnomalyDetectorsIndex;
@@ -83,7 +86,6 @@ private void waitForTemplates() throws Exception {
8386
templates.addAll(Arrays.asList(AuditorField.NOTIFICATIONS_INDEX, MlMetaIndex.INDEX_NAME,
8487
AnomalyDetectorsIndex.jobStateIndexName(),
8588
AnomalyDetectorsIndex.jobResultsIndexPrefix()));
86-
templates.addAll(Arrays.asList(WatcherIndexTemplateRegistryField.TEMPLATE_NAMES));
8789

8890
for (String template : templates) {
8991
awaitCallApi("indices.exists_template", singletonMap("name", template), emptyList(),
@@ -97,19 +99,46 @@ private void waitForWatcher() throws Exception {
9799
// ensure watcher is started, so that a test can stop watcher and everything still works fine
98100
if (isWatcherTest()) {
99101
assertBusy(() -> {
100-
try {
101-
ClientYamlTestResponse response =
102-
getAdminExecutionContext().callApi("xpack.watcher.stats", emptyMap(), emptyList(), emptyMap());
103-
String state = (String) response.evaluate("stats.0.watcher_state");
104-
if ("started".equals(state) == false) {
105-
getAdminExecutionContext().callApi("xpack.watcher.start", emptyMap(), emptyList(), emptyMap());
106-
}
107-
// assertion required to exit the assertBusy lambda
108-
assertThat(state, is("started"));
109-
} catch (IOException e) {
110-
throw new AssertionError(e);
102+
ClientYamlTestResponse response =
103+
getAdminExecutionContext().callApi("xpack.watcher.stats", emptyMap(), emptyList(), emptyMap());
104+
String state = (String) response.evaluate("stats.0.watcher_state");
105+
106+
switch (state) {
107+
case "stopped":
108+
ClientYamlTestResponse startResponse =
109+
getAdminExecutionContext().callApi("xpack.watcher.start", emptyMap(), emptyList(), emptyMap());
110+
boolean isAcknowledged = (boolean) startResponse.evaluate("acknowledged");
111+
assertThat(isAcknowledged, is(true));
112+
break;
113+
case "stopping":
114+
throw new AssertionError("waiting until stopping state reached stopped state to start again");
115+
case "starting":
116+
throw new AssertionError("waiting until starting state reached started state");
117+
case "started":
118+
// all good here, we are done
119+
break;
120+
default:
121+
throw new AssertionError("unknown state[" + state + "]");
111122
}
112123
});
124+
125+
for (String template : WatcherIndexTemplateRegistryField.TEMPLATE_NAMES) {
126+
awaitCallApi("indices.exists_template", singletonMap("name", template), emptyList(),
127+
response -> true,
128+
() -> "Exception when waiting for [" + template + "] template to be created");
129+
}
130+
131+
Map<String, String> params = MapBuilder.<String, String>newMapBuilder().put("size", "1000").put("ignore", "404").map();
132+
Response response = adminClient().performRequest("GET", ".watches/_search", params);
133+
ObjectPath objectPathResponse = ObjectPath.createFromResponse(response);
134+
Integer totalHits = objectPathResponse.evaluate("hits.total");
135+
if (totalHits != null && totalHits > 0) {
136+
List<Map<String, Object>> hits = objectPathResponse.evaluate("hits.hits");
137+
for (Map<String, Object> hit : hits) {
138+
String id = (String) hit.get("_id");
139+
assertOK(adminClient().performRequest("DELETE", "_xpack/watcher/watch/" + id));
140+
}
141+
}
113142
}
114143
}
115144

x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/Watcher.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -370,7 +370,7 @@ public Collection<Object> createComponents(Client client, ClusterService cluster
370370
final WatchParser watchParser = new WatchParser(settings, triggerService, registry, inputRegistry, cryptoService, getClock());
371371

372372
final ExecutionService executionService = new ExecutionService(settings, historyStore, triggeredWatchStore, watchExecutor,
373-
getClock(), watchParser, clusterService, client);
373+
getClock(), watchParser, clusterService, client, threadPool.generic());
374374

375375
final Consumer<Iterable<TriggerEvent>> triggerEngineListener = getTriggerEngineListener(executionService);
376376
triggerService.register(triggerEngineListener);
@@ -379,7 +379,7 @@ public Collection<Object> createComponents(Client client, ClusterService cluster
379379
watchParser, client);
380380

381381
final WatcherLifeCycleService watcherLifeCycleService =
382-
new WatcherLifeCycleService(settings, threadPool, clusterService, watcherService);
382+
new WatcherLifeCycleService(settings, clusterService, watcherService);
383383

384384
listener = new WatcherIndexingListener(settings, watchParser, getClock(), triggerService);
385385
clusterService.addListener(listener);

x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/WatcherIndexingListener.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -123,10 +123,10 @@ public Engine.Index preIndex(ShardId shardId, Engine.Index operation) {
123123
boolean shouldBeTriggered = shardAllocationConfiguration.shouldBeTriggered(watch.id());
124124
if (shouldBeTriggered) {
125125
if (watch.status().state().isActive()) {
126-
logger.debug("adding watch [{}] to trigger", watch.id());
126+
logger.debug("adding watch [{}] to trigger service", watch.id());
127127
triggerService.add(watch);
128128
} else {
129-
logger.debug("removing watch [{}] to trigger", watch.id());
129+
logger.debug("removing watch [{}] to trigger service", watch.id());
130130
triggerService.remove(watch.id());
131131
}
132132
} else {

0 commit comments

Comments
 (0)