Skip to content

Commit b57318c

Browse files
authored
Execute watcher lifecycle changes in order (elastic/x-pack-elasticsearch#4437)
This change updates the WatcherLifecycleService to have its own single thread executor that is used for lifecycle changes in order to have a guarantee for the order that the changes are executed in. Previously, a runnable would be submitted to the generic threadpool for each lifecycle change that is needed. There was no guarantee of ordering for these changes and no checks to see if a state change was already in flight. Relates elastic/x-pack-elasticsearch#4429 Original commit: elastic/x-pack-elasticsearch@14b7338
1 parent 91ab88e commit b57318c

File tree

3 files changed

+36
-7
lines changed

3 files changed

+36
-7
lines changed

plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/WatcherLifeCycleService.java

Lines changed: 23 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import org.elasticsearch.common.settings.Setting.Property;
2323
import org.elasticsearch.common.settings.Settings;
2424
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
25+
import org.elasticsearch.common.util.concurrent.EsExecutors;
2526
import org.elasticsearch.gateway.GatewayService;
2627
import org.elasticsearch.threadpool.ThreadPool;
2728
import org.elasticsearch.xpack.core.upgrade.UpgradeField;
@@ -35,12 +36,14 @@
3536
import java.util.Collections;
3637
import java.util.List;
3738
import java.util.concurrent.ExecutorService;
39+
import java.util.concurrent.TimeUnit;
3840
import java.util.concurrent.atomic.AtomicReference;
3941
import java.util.function.Consumer;
4042
import java.util.stream.Collectors;
4143

4244
import static org.elasticsearch.cluster.routing.ShardRoutingState.RELOCATING;
4345
import static org.elasticsearch.cluster.routing.ShardRoutingState.STARTED;
46+
import static org.elasticsearch.common.util.concurrent.EsExecutors.daemonThreadFactory;
4447

4548
public class WatcherLifeCycleService extends AbstractComponent implements ClusterStateListener {
4649

@@ -51,6 +54,8 @@ public class WatcherLifeCycleService extends AbstractComponent implements Cluste
5154
public static final Setting<Boolean> SETTING_REQUIRE_MANUAL_START =
5255
Setting.boolSetting("xpack.watcher.require_manual_start", false, Property.NodeScope);
5356

57+
private static final String LIFECYCLE_THREADPOOL_NAME = "watcher-lifecycle";
58+
5459
private final WatcherService watcherService;
5560
private final ExecutorService executor;
5661
private AtomicReference<List<String>> previousAllocationIds = new AtomicReference<>(Collections.emptyList());
@@ -59,8 +64,20 @@ public class WatcherLifeCycleService extends AbstractComponent implements Cluste
5964

6065
WatcherLifeCycleService(Settings settings, ThreadPool threadPool, ClusterService clusterService,
6166
WatcherService watcherService) {
67+
// use a single thread executor so that lifecycle changes are handled in the order they
68+
// are submitted in
69+
this(settings, clusterService, watcherService, EsExecutors.newFixed(
70+
LIFECYCLE_THREADPOOL_NAME,
71+
1,
72+
1000,
73+
daemonThreadFactory(settings, LIFECYCLE_THREADPOOL_NAME),
74+
threadPool.getThreadContext()));
75+
}
76+
77+
WatcherLifeCycleService(Settings settings, ClusterService clusterService,
78+
WatcherService watcherService, ExecutorService executorService) {
6279
super(settings);
63-
this.executor = threadPool.executor(ThreadPool.Names.GENERIC);
80+
this.executor = executorService;
6481
this.watcherService = watcherService;
6582
this.requireManualStart = SETTING_REQUIRE_MANUAL_START.get(settings);
6683
clusterService.addListener(this);
@@ -81,6 +98,11 @@ public synchronized void stop(String reason) {
8198
synchronized void shutDown() {
8299
shutDown = true;
83100
stop("shutdown initiated");
101+
stopExecutor();
102+
}
103+
104+
void stopExecutor() {
105+
ThreadPool.terminate(executor, 10L, TimeUnit.SECONDS);
84106
}
85107

86108
private synchronized void start(ClusterState state) {

plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/WatcherService.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -139,6 +139,8 @@ public void start(ClusterState clusterState) throws Exception {
139139
state.set(WatcherState.STOPPED);
140140
throw e;
141141
}
142+
} else {
143+
logger.debug("could not transition state from stopped to starting, current state [{}]", state.get());
142144
}
143145
}
144146

@@ -157,6 +159,8 @@ public void stop(String reason) {
157159
executionService.stop();
158160
state.set(WatcherState.STOPPED);
159161
logger.debug("watch service has stopped");
162+
} else {
163+
logger.debug("could not transition state from started to stopping, current state [{}]", state.get());
160164
}
161165
} catch (Exception e) {
162166
state.set(WatcherState.STOPPED);

plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/WatcherLifeCycleServiceTests.java

Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,6 @@
55
*/
66
package org.elasticsearch.xpack.watcher;
77

8-
import com.carrotsearch.randomizedtesting.annotations.Repeat;
98
import org.elasticsearch.ElasticsearchSecurityException;
109
import org.elasticsearch.Version;
1110
import org.elasticsearch.cluster.AckedClusterStateUpdateTask;
@@ -27,12 +26,12 @@
2726
import org.elasticsearch.cluster.service.ClusterService;
2827
import org.elasticsearch.common.settings.Settings;
2928
import org.elasticsearch.common.util.concurrent.EsExecutors;
29+
import org.elasticsearch.common.util.concurrent.ThreadContext;
3030
import org.elasticsearch.discovery.DiscoverySettings;
3131
import org.elasticsearch.gateway.GatewayService;
3232
import org.elasticsearch.index.Index;
3333
import org.elasticsearch.index.shard.ShardId;
3434
import org.elasticsearch.test.ESTestCase;
35-
import org.elasticsearch.test.VersionUtils;
3635
import org.elasticsearch.threadpool.ThreadPool;
3736
import org.elasticsearch.xpack.core.watcher.WatcherState;
3837
import org.elasticsearch.xpack.core.watcher.execution.TriggeredWatchStoreField;
@@ -43,7 +42,6 @@
4342
import java.util.Collections;
4443
import java.util.HashSet;
4544
import java.util.List;
46-
import java.util.concurrent.ExecutorService;
4745
import java.util.stream.Collectors;
4846
import java.util.stream.IntStream;
4947

@@ -73,8 +71,7 @@ public class WatcherLifeCycleServiceTests extends ESTestCase {
7371
@Before
7472
public void prepareServices() {
7573
ThreadPool threadPool = mock(ThreadPool.class);
76-
final ExecutorService executorService = EsExecutors.newDirectExecutorService();
77-
when(threadPool.executor(anyString())).thenReturn(executorService);
74+
when(threadPool.getThreadContext()).thenReturn(new ThreadContext(Settings.EMPTY));
7875
ClusterService clusterService = mock(ClusterService.class);
7976
Answer<Object> answer = invocationOnMock -> {
8077
AckedClusterStateUpdateTask updateTask = (AckedClusterStateUpdateTask) invocationOnMock.getArguments()[1];
@@ -83,7 +80,13 @@ public void prepareServices() {
8380
};
8481
doAnswer(answer).when(clusterService).submitStateUpdateTask(anyString(), any(ClusterStateUpdateTask.class));
8582
watcherService = mock(WatcherService.class);
86-
lifeCycleService = new WatcherLifeCycleService(Settings.EMPTY, threadPool, clusterService, watcherService);
83+
lifeCycleService = new WatcherLifeCycleService(Settings.EMPTY, clusterService, watcherService,
84+
EsExecutors.newDirectExecutorService()) {
85+
@Override
86+
void stopExecutor() {
87+
// direct executor cannot be terminated
88+
}
89+
};
8790
}
8891

8992
public void testStartAndStopCausedByClusterState() throws Exception {

0 commit comments

Comments
 (0)