Skip to content

Commit ec360da

Browse files
committed
Revert "Ensure no Watches are running after Watcher is stopped."
This reverts commit 926a671.
1 parent ce5dd1b commit ec360da

File tree

1 file changed

+10
-9
lines changed

1 file changed

+10
-9
lines changed

x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/execution/ExecutionService.java

Lines changed: 10 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -106,7 +106,7 @@ public class ExecutionService {
106106
private final WatchExecutor executor;
107107
private final ExecutorService genericExecutor;
108108

109-
private CurrentExecutions currentExecutions;
109+
private AtomicReference<CurrentExecutions> currentExecutions = new AtomicReference<>();
110110
private final AtomicBoolean paused = new AtomicBoolean(false);
111111

112112
public ExecutionService(Settings settings, HistoryStore historyStore, TriggeredWatchStore triggeredWatchStore, WatchExecutor executor,
@@ -123,7 +123,7 @@ public ExecutionService(Settings settings, HistoryStore historyStore, TriggeredW
123123
this.client = client;
124124
this.genericExecutor = genericExecutor;
125125
this.indexDefaultTimeout = settings.getAsTime("xpack.watcher.internal.ops.index.default_timeout", TimeValue.timeValueSeconds(30));
126-
this.currentExecutions = new CurrentExecutions();
126+
this.currentExecutions.set(new CurrentExecutions());
127127
}
128128

129129
public void unPause() {
@@ -169,12 +169,12 @@ public long executionThreadPoolMaxSize() {
169169

170170
// for testing only
171171
CurrentExecutions getCurrentExecutions() {
172-
return currentExecutions;
172+
return currentExecutions.get();
173173
}
174174

175175
public List<WatchExecutionSnapshot> currentExecutions() {
176176
List<WatchExecutionSnapshot> currentExecutions = new ArrayList<>();
177-
for (WatchExecution watchExecution : this.currentExecutions) {
177+
for (WatchExecution watchExecution : this.currentExecutions.get()) {
178178
currentExecutions.add(watchExecution.createSnapshot());
179179
}
180180
// Lets show the longest running watch first:
@@ -279,7 +279,7 @@ public WatchRecord execute(WatchExecutionContext ctx) {
279279
WatchRecord record = null;
280280
final String watchId = ctx.id().watchId();
281281
try {
282-
boolean executionAlreadyExists = currentExecutions.put(watchId, new WatchExecution(ctx, Thread.currentThread()));
282+
boolean executionAlreadyExists = currentExecutions.get().put(watchId, new WatchExecution(ctx, Thread.currentThread()));
283283
if (executionAlreadyExists) {
284284
logger.trace("not executing watch [{}] because it is already queued", watchId);
285285
record = ctx.abortBeforeExecution(ExecutionState.NOT_EXECUTED_ALREADY_QUEUED, "Watch is already queued in thread pool");
@@ -334,7 +334,7 @@ record = createWatchRecord(record, ctx, e);
334334

335335
triggeredWatchStore.delete(ctx.id());
336336
}
337-
currentExecutions.remove(watchId);
337+
currentExecutions.get().remove(watchId);
338338
logger.debug("finished [{}]/[{}]", watchId, ctx.id());
339339
}
340340
return record;
@@ -578,9 +578,10 @@ public Counters executionTimes() {
578578
* This clears out the current executions and sets new empty current executions
579579
* This is needed, because when this method is called, watcher keeps running, so sealing executions would be a bad idea
580580
*/
581-
private synchronized void clearExecutions() {
582-
currentExecutions.sealAndAwaitEmpty(maxStopTimeout);
583-
currentExecutions = new CurrentExecutions();
581+
private void clearExecutions() {
582+
final CurrentExecutions currentExecutionsBeforeSetting = currentExecutions.getAndSet(new CurrentExecutions());
583+
// clear old executions in background, no need to wait
584+
genericExecutor.execute(() -> currentExecutionsBeforeSetting.sealAndAwaitEmpty(maxStopTimeout));
584585
}
585586

586587
// the watch execution task takes another runnable as parameter

0 commit comments

Comments
 (0)