Skip to content

Commit 926a671

Browse files
committed
Ensure no Watches are running after Watcher is stopped.
Watcher keeps track of which watches are currently running keyed by watcher name/id. If a watch is currently running it will not run the same watch and will result in a message : "Watch is already queued in thread pool" and a state: "not_executed_already_queued" When Watcher is stopped, it will stop watcher (rejecting any new watches), but allow the currently running watches to run to completion. Waiting for the currently running watches to complete is done async to the stopping of Watcher. Meaning that Watcher will report as fully stopped, but there is still a background thread waiting for all of the Watches to finish before it removes the watch from it's list of currently running Watches. The integration test start and stop watcher between each test. The goal to ensure a clean state between tests. However, since Watcher can report "yes - I am stopped", but there are still running Watches, the tests may bleed over into each other, especially on slow machines. This can result in errors related to "Watch is already queued in thread pool" and a state: "not_executed_already_queued", and is VERY difficult to reproduce. This commit changes the waiting for Watches on stop/pause from an aysnc waiting, back to a sync wait as it worked prior to elastic#30118. This help ensure that for testing testing scenario the stop much more predictable, such that after fully stopped, no Watches are running. This should have little impact if any on production code since Watcher isn't stopped/paused too often and when it stop/pause it has the same behavior is the same, it will just run on the calling thread, not a generic thread.
1 parent 217b875 commit 926a671

File tree

1 file changed

+9
-10
lines changed

1 file changed

+9
-10
lines changed

x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/execution/ExecutionService.java

+9-10
Original file line numberDiff line numberDiff line change
@@ -106,7 +106,7 @@ public class ExecutionService {
106106
private final WatchExecutor executor;
107107
private final ExecutorService genericExecutor;
108108

109-
private AtomicReference<CurrentExecutions> currentExecutions = new AtomicReference<>();
109+
private CurrentExecutions currentExecutions;
110110
private final AtomicBoolean paused = new AtomicBoolean(false);
111111

112112
public ExecutionService(Settings settings, HistoryStore historyStore, TriggeredWatchStore triggeredWatchStore, WatchExecutor executor,
@@ -123,7 +123,7 @@ public ExecutionService(Settings settings, HistoryStore historyStore, TriggeredW
123123
this.client = client;
124124
this.genericExecutor = genericExecutor;
125125
this.indexDefaultTimeout = settings.getAsTime("xpack.watcher.internal.ops.index.default_timeout", TimeValue.timeValueSeconds(30));
126-
this.currentExecutions.set(new CurrentExecutions());
126+
this.currentExecutions = new CurrentExecutions();
127127
}
128128

129129
public void unPause() {
@@ -169,12 +169,12 @@ public long executionThreadPoolMaxSize() {
169169

170170
// for testing only
171171
CurrentExecutions getCurrentExecutions() {
172-
return currentExecutions.get();
172+
return currentExecutions;
173173
}
174174

175175
public List<WatchExecutionSnapshot> currentExecutions() {
176176
List<WatchExecutionSnapshot> currentExecutions = new ArrayList<>();
177-
for (WatchExecution watchExecution : this.currentExecutions.get()) {
177+
for (WatchExecution watchExecution : this.currentExecutions) {
178178
currentExecutions.add(watchExecution.createSnapshot());
179179
}
180180
// Lets show the longest running watch first:
@@ -279,7 +279,7 @@ public WatchRecord execute(WatchExecutionContext ctx) {
279279
WatchRecord record = null;
280280
final String watchId = ctx.id().watchId();
281281
try {
282-
boolean executionAlreadyExists = currentExecutions.get().put(watchId, new WatchExecution(ctx, Thread.currentThread()));
282+
boolean executionAlreadyExists = currentExecutions.put(watchId, new WatchExecution(ctx, Thread.currentThread()));
283283
if (executionAlreadyExists) {
284284
logger.trace("not executing watch [{}] because it is already queued", watchId);
285285
record = ctx.abortBeforeExecution(ExecutionState.NOT_EXECUTED_ALREADY_QUEUED, "Watch is already queued in thread pool");
@@ -334,7 +334,7 @@ record = createWatchRecord(record, ctx, e);
334334

335335
triggeredWatchStore.delete(ctx.id());
336336
}
337-
currentExecutions.get().remove(watchId);
337+
currentExecutions.remove(watchId);
338338
logger.debug("finished [{}]/[{}]", watchId, ctx.id());
339339
}
340340
return record;
@@ -578,10 +578,9 @@ public Counters executionTimes() {
578578
* This clears out the current executions and sets new empty current executions
579579
* This is needed, because when this method is called, watcher keeps running, so sealing executions would be a bad idea
580580
*/
581-
private void clearExecutions() {
582-
final CurrentExecutions currentExecutionsBeforeSetting = currentExecutions.getAndSet(new CurrentExecutions());
583-
// clear old executions in background, no need to wait
584-
genericExecutor.execute(() -> currentExecutionsBeforeSetting.sealAndAwaitEmpty(maxStopTimeout));
581+
private synchronized void clearExecutions() {
582+
currentExecutions.sealAndAwaitEmpty(maxStopTimeout);
583+
currentExecutions = new CurrentExecutions();
585584
}
586585

587586
// the watch execution task takes another runnable as parameter

0 commit comments

Comments
 (0)