Skip to content

Commit 567ad7f

Browse files
authored
Watcher add stopped listener (#43939)
When Watcher is stopped and there are still outstanding watches running Watcher will report it self as stopped. In normal cases, this is not problematic. However, for integration tests Watcher is started and stopped between each test to help ensure a clean slate for each test. The tests are blocking only on the stopped state and make an implicit assumption that all watches are finished if the Watcher is stopped. This is an incorrect assumption since Stopped really means, "I will not accept any more watches". This can lead to un-predictable behavior in the tests such as message : "Watch is already queued in thread pool" and state: "not_executed_already_queued". This can also change the .watcher-history if watches linger between tests. This commit changes the semantics of a manual stopping watcher to now mean: "I will not accept any more watches AND all running watches are complete". There is now an intermediary step "Stopping" and callback to allow transition to a "Stopped" state when all Watches have completed. Additionally since this impacts how long the tests will block waiting for a "Stopped" state, the timeout has been increased. Related: #42409
1 parent bd5dc2f commit 567ad7f

File tree

14 files changed

+107
-41
lines changed

14 files changed

+107
-41
lines changed

x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/WatcherLifeCycleService.java

Lines changed: 26 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,8 @@
55
*/
66
package org.elasticsearch.xpack.watcher;
77

8+
import org.apache.logging.log4j.LogManager;
9+
import org.apache.logging.log4j.Logger;
810
import org.elasticsearch.cluster.ClusterChangedEvent;
911
import org.elasticsearch.cluster.ClusterState;
1012
import org.elasticsearch.cluster.ClusterStateListener;
@@ -25,6 +27,7 @@
2527

2628
import java.util.Collections;
2729
import java.util.Comparator;
30+
import java.util.EnumSet;
2831
import java.util.List;
2932
import java.util.Set;
3033
import java.util.concurrent.atomic.AtomicReference;
@@ -35,10 +38,12 @@
3538

3639
public class WatcherLifeCycleService implements ClusterStateListener {
3740

41+
private static final Logger logger = LogManager.getLogger(WatcherLifeCycleService.class);
3842
private final AtomicReference<WatcherState> state = new AtomicReference<>(WatcherState.STARTED);
3943
private final AtomicReference<List<ShardRouting>> previousShardRoutings = new AtomicReference<>(Collections.emptyList());
4044
private volatile boolean shutDown = false; // indicates that the node has been shutdown and we should never start watcher after this.
4145
private volatile WatcherService watcherService;
46+
private final EnumSet<WatcherState> stopStates = EnumSet.of(WatcherState.STOPPED, WatcherState.STOPPING);
4247

4348
WatcherLifeCycleService(ClusterService clusterService, WatcherService watcherService) {
4449
this.watcherService = watcherService;
@@ -57,8 +62,10 @@ synchronized void shutDown() {
5762
this.state.set(WatcherState.STOPPING);
5863
shutDown = true;
5964
clearAllocationIds();
60-
watcherService.shutDown();
61-
this.state.set(WatcherState.STOPPED);
65+
watcherService.shutDown(() -> {
66+
this.state.set(WatcherState.STOPPED);
67+
logger.info("watcher has stopped and shutdown");
68+
});
6269
}
6370

6471
/**
@@ -88,9 +95,10 @@ public void clusterChanged(ClusterChangedEvent event) {
8895
}
8996

9097
boolean isWatcherStoppedManually = isWatcherStoppedManually(event.state());
98+
boolean isStoppedOrStopping = stopStates.contains(this.state.get());
9199
// if this is not a data node, we need to start it ourselves possibly
92100
if (event.state().nodes().getLocalNode().isDataNode() == false &&
93-
isWatcherStoppedManually == false && this.state.get() == WatcherState.STOPPED) {
101+
isWatcherStoppedManually == false && isStoppedOrStopping) {
94102
this.state.set(WatcherState.STARTING);
95103
watcherService.start(event.state(), () -> this.state.set(WatcherState.STARTED));
96104
return;
@@ -99,8 +107,20 @@ public void clusterChanged(ClusterChangedEvent event) {
99107
if (isWatcherStoppedManually) {
100108
if (this.state.get() == WatcherState.STARTED) {
101109
clearAllocationIds();
102-
watcherService.stop("watcher manually marked to shutdown by cluster state update");
103-
this.state.set(WatcherState.STOPPED);
110+
boolean stopping = this.state.compareAndSet(WatcherState.STARTED, WatcherState.STOPPING);
111+
if (stopping) {
112+
//waiting to set state to stopped until after all currently running watches are finished
113+
watcherService.stop("watcher manually marked to shutdown by cluster state update", () -> {
114+
//only transition from stopping -> stopped (which may not be the case if restarted quickly)
115+
boolean stopped = state.compareAndSet(WatcherState.STOPPING, WatcherState.STOPPED);
116+
if (stopped) {
117+
logger.info("watcher has stopped");
118+
} else {
119+
logger.info("watcher has not been stopped. not currently in a stopping state, current state [{}]", state.get());
120+
}
121+
122+
});
123+
}
104124
}
105125
return;
106126
}
@@ -142,7 +162,7 @@ public void clusterChanged(ClusterChangedEvent event) {
142162
previousShardRoutings.set(localAffectedShardRoutings);
143163
if (state.get() == WatcherState.STARTED) {
144164
watcherService.reload(event.state(), "new local watcher shard allocation ids");
145-
} else if (state.get() == WatcherState.STOPPED) {
165+
} else if (isStoppedOrStopping) {
146166
this.state.set(WatcherState.STARTING);
147167
watcherService.start(event.state(), () -> this.state.set(WatcherState.STARTED));
148168
}

x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/WatcherService.java

Lines changed: 15 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@
3535
import org.elasticsearch.threadpool.ThreadPool;
3636
import org.elasticsearch.xpack.core.ClientHelper;
3737
import org.elasticsearch.xpack.core.upgrade.UpgradeField;
38+
import org.elasticsearch.xpack.core.watcher.WatcherState;
3839
import org.elasticsearch.xpack.core.watcher.execution.TriggeredWatchStoreField;
3940
import org.elasticsearch.xpack.core.watcher.watch.Watch;
4041
import org.elasticsearch.xpack.watcher.execution.ExecutionService;
@@ -144,24 +145,29 @@ public boolean validate(ClusterState state) {
144145
}
145146

146147
/**
147-
* Stops the watcher service and marks its services as paused
148+
* Stops the watcher service and marks its services as paused. Callers should set the Watcher state to {@link WatcherState#STOPPING}
149+
* prior to calling this method.
150+
*
151+
* @param stoppedListener The listener that will set Watcher state to: {@link WatcherState#STOPPED}, may not be {@code null}
148152
*/
149-
public void stop(String reason) {
153+
public void stop(String reason, Runnable stoppedListener) {
154+
assert stoppedListener != null;
150155
logger.info("stopping watch service, reason [{}]", reason);
151-
executionService.pause();
156+
executionService.pause(stoppedListener);
152157
triggerService.pauseExecution();
153158
}
154159

155160
/**
156161
* shuts down the trigger service as well to make sure there are no lingering threads
157-
* also no need to check anything, as this is final, we just can go to status STOPPED
162+
*
163+
* @param stoppedListener The listener that will set Watcher state to: {@link WatcherState#STOPPED}, may not be {@code null}
158164
*/
159-
void shutDown() {
165+
void shutDown(Runnable stoppedListener) {
166+
assert stoppedListener != null;
160167
logger.info("stopping watch service, reason [shutdown initiated]");
161-
executionService.pause();
168+
executionService.pause(stoppedListener);
162169
triggerService.stop();
163170
stopExecutor();
164-
logger.debug("watch service has stopped");
165171
}
166172

167173
void stopExecutor() {
@@ -185,7 +191,7 @@ void reload(ClusterState state, String reason) {
185191
processedClusterStateVersion.set(state.getVersion());
186192

187193
triggerService.pauseExecution();
188-
int cancelledTaskCount = executionService.clearExecutionsAndQueue();
194+
int cancelledTaskCount = executionService.clearExecutionsAndQueue(() -> {});
189195
logger.info("reloading watcher, reason [{}], cancelled [{}] queued tasks", reason, cancelledTaskCount);
190196

191197
executor.execute(wrapWatcherService(() -> reloadInner(state, reason, false),
@@ -256,7 +262,7 @@ private synchronized boolean reloadInner(ClusterState state, String reason, bool
256262
*/
257263
public void pauseExecution(String reason) {
258264
triggerService.pauseExecution();
259-
int cancelledTaskCount = executionService.pause();
265+
int cancelledTaskCount = executionService.pause(() -> {});
260266
logger.info("paused watch execution, reason [{}], cancelled [{}] queued tasks", reason, cancelledTaskCount);
261267
}
262268

x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/execution/CurrentExecutions.java

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,8 +5,11 @@
55
*/
66
package org.elasticsearch.xpack.watcher.execution;
77

8+
import org.apache.logging.log4j.LogManager;
9+
import org.apache.logging.log4j.Logger;
810
import org.apache.lucene.util.SetOnce;
911
import org.elasticsearch.common.unit.TimeValue;
12+
import org.elasticsearch.xpack.core.watcher.WatcherState;
1013

1114
import java.util.Iterator;
1215
import java.util.concurrent.ConcurrentHashMap;
@@ -19,6 +22,7 @@
1922

2023
public final class CurrentExecutions implements Iterable<ExecutionService.WatchExecution> {
2124

25+
private static final Logger logger = LogManager.getLogger(CurrentExecutions.class);
2226
private final ConcurrentMap<String, ExecutionService.WatchExecution> currentExecutions = new ConcurrentHashMap<>();
2327
// the condition of the lock is used to wait and signal the finishing of all executions on shutdown
2428
private final ReentrantLock lock = new ReentrantLock();
@@ -63,9 +67,12 @@ public void remove(String id) {
6367
* Calling this method makes the class stop accepting new executions and throws and exception instead.
6468
* In addition it waits for a certain amount of time for current executions to finish before returning
6569
*
66-
* @param maxStopTimeout The maximum wait time to wait to current executions to finish
70+
* @param maxStopTimeout The maximum wait time to wait to current executions to finish
71+
* @param stoppedListener The listener that will set Watcher state to: {@link WatcherState#STOPPED}, may be a no-op assuming the
72+
* {@link WatcherState#STOPPED} is set elsewhere or not needed to be set.
6773
*/
68-
void sealAndAwaitEmpty(TimeValue maxStopTimeout) {
74+
void sealAndAwaitEmpty(TimeValue maxStopTimeout, Runnable stoppedListener) {
75+
assert stoppedListener != null;
6976
lock.lock();
7077
// We may have current executions still going on.
7178
// We should try to wait for the current executions to have completed.
@@ -81,6 +88,8 @@ void sealAndAwaitEmpty(TimeValue maxStopTimeout) {
8188
} catch (InterruptedException e) {
8289
Thread.currentThread().interrupt();
8390
} finally {
91+
//fully stop Watcher after all executions are finished
92+
stoppedListener.run();
8493
lock.unlock();
8594
}
8695
}

x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/execution/ExecutionService.java

Lines changed: 24 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@
3838
import org.elasticsearch.common.xcontent.json.JsonXContent;
3939
import org.elasticsearch.index.engine.DocumentMissingException;
4040
import org.elasticsearch.index.engine.VersionConflictEngineException;
41+
import org.elasticsearch.xpack.core.watcher.WatcherState;
4142
import org.elasticsearch.xpack.core.watcher.actions.ActionWrapper;
4243
import org.elasticsearch.xpack.core.watcher.actions.ActionWrapperResult;
4344
import org.elasticsearch.xpack.core.watcher.common.stats.Counters;
@@ -135,23 +136,31 @@ public void unPause() {
135136
* Pausing means, that no new watch executions will be done unless this pausing is explicitly unset.
136137
* This is important when watcher is stopped, so that scheduled watches do not accidentally get executed.
137138
* This should not be used when we need to reload watcher based on some cluster state changes, then just calling
138-
* {@link #clearExecutionsAndQueue()} is the way to go
139+
* {@link #clearExecutionsAndQueue(Runnable)} is the way to go
140+
*
141+
* @param stoppedListener The listener that will set Watcher state to: {@link WatcherState#STOPPED}, may be a no-op assuming the
142+
* {@link WatcherState#STOPPED} is set elsewhere or not needed to be set.
139143
*
140144
* @return the number of tasks that have been removed
141145
*/
142-
public int pause() {
146+
public int pause(Runnable stoppedListener) {
147+
assert stoppedListener != null;
143148
paused.set(true);
144-
return clearExecutionsAndQueue();
149+
return clearExecutionsAndQueue(stoppedListener);
145150
}
146151

147152
/**
148153
* Empty the currently queued tasks and wait for current executions to finish.
149154
*
155+
* @param stoppedListener The listener that will set Watcher state to: {@link WatcherState#STOPPED}, may be a no-op assuming the
156+
* {@link WatcherState#STOPPED} is set elsewhere or not needed to be set.
157+
*
150158
* @return the number of tasks that have been removed
151159
*/
152-
public int clearExecutionsAndQueue() {
160+
public int clearExecutionsAndQueue(Runnable stoppedListener) {
161+
assert stoppedListener != null;
153162
int cancelledTaskCount = executor.queue().drainTo(new ArrayList<>());
154-
this.clearExecutions();
163+
this.clearExecutions(stoppedListener);
155164
return cancelledTaskCount;
156165
}
157166

@@ -278,8 +287,10 @@ public WatchRecord execute(WatchExecutionContext ctx) {
278287
ctx.setNodeId(clusterService.localNode().getId());
279288
WatchRecord record = null;
280289
final String watchId = ctx.id().watchId();
290+
//pull this to a local reference since the class reference can be swapped, and need to ensure same object is used for put/remove
291+
final CurrentExecutions currentExecutions = this.currentExecutions.get();
281292
try {
282-
boolean executionAlreadyExists = currentExecutions.get().put(watchId, new WatchExecution(ctx, Thread.currentThread()));
293+
boolean executionAlreadyExists = currentExecutions.put(watchId, new WatchExecution(ctx, Thread.currentThread()));
283294
if (executionAlreadyExists) {
284295
logger.trace("not executing watch [{}] because it is already queued", watchId);
285296
record = ctx.abortBeforeExecution(ExecutionState.NOT_EXECUTED_ALREADY_QUEUED, "Watch is already queued in thread pool");
@@ -334,7 +345,7 @@ record = createWatchRecord(record, ctx, e);
334345

335346
triggeredWatchStore.delete(ctx.id());
336347
}
337-
currentExecutions.get().remove(watchId);
348+
currentExecutions.remove(watchId);
338349
logger.debug("finished [{}]/[{}]", watchId, ctx.id());
339350
}
340351
return record;
@@ -577,11 +588,15 @@ public Counters executionTimes() {
577588
/**
578589
* This clears out the current executions and sets new empty current executions
579590
* This is needed, because when this method is called, watcher keeps running, so sealing executions would be a bad idea
591+
*
592+
* @param stoppedListener The listener that will set Watcher state to: {@link WatcherState#STOPPED}, may be a no-op assuming the
593+
* {@link WatcherState#STOPPED} is set elsewhere or not needed to be set.
580594
*/
581-
private void clearExecutions() {
595+
private void clearExecutions(Runnable stoppedListener) {
596+
assert stoppedListener != null;
582597
final CurrentExecutions currentExecutionsBeforeSetting = currentExecutions.getAndSet(new CurrentExecutions());
583598
// clear old executions in background, no need to wait
584-
genericExecutor.execute(() -> currentExecutionsBeforeSetting.sealAndAwaitEmpty(maxStopTimeout));
599+
genericExecutor.execute(() -> currentExecutionsBeforeSetting.sealAndAwaitEmpty(maxStopTimeout, stoppedListener));
585600
}
586601

587602
// the watch execution task takes another runnable as parameter

x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/WatcherLifeCycleServiceTests.java

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@
3434
import org.elasticsearch.xpack.core.watcher.WatcherState;
3535
import org.elasticsearch.xpack.core.watcher.watch.Watch;
3636
import org.junit.Before;
37+
import org.mockito.ArgumentCaptor;
3738
import org.mockito.stubbing.Answer;
3839

3940
import java.util.Collections;
@@ -133,8 +134,8 @@ public void testShutdown() {
133134
when(watcherService.validate(clusterState)).thenReturn(true);
134135

135136
lifeCycleService.shutDown();
136-
verify(watcherService, never()).stop(anyString());
137-
verify(watcherService, times(1)).shutDown();
137+
verify(watcherService, never()).stop(anyString(), any());
138+
verify(watcherService, times(1)).shutDown(any());
138139

139140
reset(watcherService);
140141
lifeCycleService.clusterChanged(new ClusterChangedEvent("any", clusterState, clusterState));
@@ -175,7 +176,12 @@ public void testManualStartStop() {
175176
.build();
176177

177178
lifeCycleService.clusterChanged(new ClusterChangedEvent("foo", stoppedClusterState, clusterState));
178-
verify(watcherService, times(1)).stop(eq("watcher manually marked to shutdown by cluster state update"));
179+
ArgumentCaptor<Runnable> captor = ArgumentCaptor.forClass(Runnable.class);
180+
verify(watcherService, times(1))
181+
.stop(eq("watcher manually marked to shutdown by cluster state update"), captor.capture());
182+
assertEquals(WatcherState.STOPPING, lifeCycleService.getState());
183+
captor.getValue().run();
184+
assertEquals(WatcherState.STOPPED, lifeCycleService.getState());
179185

180186
// Starting via cluster state update, as the watcher metadata block is removed/set to true
181187
reset(watcherService);

x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/WatcherServiceTests.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -269,8 +269,8 @@ void stopExecutor() {
269269
csBuilder.metaData(MetaData.builder());
270270

271271
service.reload(csBuilder.build(), "whatever");
272-
verify(executionService).clearExecutionsAndQueue();
273-
verify(executionService, never()).pause();
272+
verify(executionService).clearExecutionsAndQueue(any());
273+
verify(executionService, never()).pause(any());
274274
verify(triggerService).pauseExecution();
275275
}
276276

x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/test/AbstractWatcherIntegrationTestCase.java

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -520,10 +520,12 @@ protected void stopWatcher() throws Exception {
520520
WatcherStatsResponse watcherStatsResponse = new WatcherStatsRequestBuilder(client()).get();
521521
assertThat(watcherStatsResponse.hasFailures(), is(false));
522522
List<Tuple<String, WatcherState>> currentStatesFromStatsRequest = watcherStatsResponse.getNodes().stream()
523-
.map(response -> Tuple.tuple(response.getNode().getName(), response.getWatcherState()))
524-
.collect(Collectors.toList());
523+
.map(response -> Tuple.tuple(response.getNode().getName() + " (" + response.getThreadPoolQueueSize() + ")",
524+
response.getWatcherState())).collect(Collectors.toList());
525525
List<WatcherState> states = currentStatesFromStatsRequest.stream().map(Tuple::v2).collect(Collectors.toList());
526526

527+
528+
527529
logger.info("waiting to stop watcher, current states {}", currentStatesFromStatsRequest);
528530

529531
boolean isAllStateStarted = states.stream().allMatch(w -> w == WatcherState.STARTED);
@@ -548,7 +550,7 @@ protected void stopWatcher() throws Exception {
548550
}
549551

550552
throw new AssertionError("unexpected state, retrying with next run");
551-
});
553+
}, 30, TimeUnit.SECONDS);
552554
}
553555

554556
public static class NoopEmailService extends EmailService {

x-pack/qa/smoke-test-watcher-with-security/src/test/java/org/elasticsearch/smoketest/SmokeTestWatcherWithSecurityClientYamlTestSuiteIT.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
import org.junit.Before;
2020

2121
import java.util.Collections;
22+
import java.util.concurrent.TimeUnit;
2223

2324
import static java.util.Collections.emptyList;
2425
import static java.util.Collections.emptyMap;
@@ -108,7 +109,7 @@ public void stopWatcher() throws Exception {
108109
default:
109110
throw new AssertionError("unknown state[" + state + "]");
110111
}
111-
});
112+
}, 30, TimeUnit.SECONDS);
112113
}
113114

114115
@Override

x-pack/qa/smoke-test-watcher-with-security/src/test/java/org/elasticsearch/smoketest/SmokeTestWatcherWithSecurityIT.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222

2323
import java.io.IOException;
2424
import java.util.Map;
25+
import java.util.concurrent.TimeUnit;
2526
import java.util.concurrent.atomic.AtomicReference;
2627

2728
import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
@@ -116,7 +117,7 @@ public void stopWatcher() throws Exception {
116117
} catch (IOException e) {
117118
throw new AssertionError(e);
118119
}
119-
});
120+
}, 30, TimeUnit.SECONDS);
120121

121122
adminClient().performRequest(new Request("DELETE", "/my_test_index"));
122123
}

x-pack/qa/smoke-test-watcher/src/test/java/org/elasticsearch/smoketest/SmokeTestWatcherTestSuiteIT.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121

2222
import java.io.IOException;
2323
import java.util.Map;
24+
import java.util.concurrent.TimeUnit;
2425
import java.util.concurrent.atomic.AtomicReference;
2526

2627
import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
@@ -93,7 +94,7 @@ public void stopWatcher() throws Exception {
9394
default:
9495
throw new AssertionError("unknown state[" + state + "]");
9596
}
96-
});
97+
}, 30, TimeUnit.SECONDS);
9798
}
9899

99100
@Override

0 commit comments

Comments
 (0)