Skip to content

Commit 5295c8e

Browse files
jaymodeweizijun
authored andcommitted
Fixed missed stopping of SchedulerEngine (elastic#39193)
The SchedulerEngine is used in several places in our code and not all of these usages properly stopped the SchedulerEngine, which could lead to test failures due to leaked threads from the SchedulerEngine. This change adds stopping to these usages in order to avoid the thread leaks that cause CI failures and noise. Closes elastic#38875
1 parent 3914d24 commit 5295c8e

File tree

6 files changed

+58
-17
lines changed

6 files changed

+58
-17
lines changed

x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/scheduler/SchedulerEngine.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -126,8 +126,12 @@ public void start(Collection<Job> jobs) {
126126
public void stop() {
127127
scheduler.shutdownNow();
128128
try {
129-
scheduler.awaitTermination(5, TimeUnit.SECONDS);
129+
final boolean terminated = scheduler.awaitTermination(5L, TimeUnit.SECONDS);
130+
if (terminated == false) {
131+
logger.warn("scheduler engine was not terminated after waiting 5s");
132+
}
130133
} catch (InterruptedException e) {
134+
logger.warn("interrupted while waiting for scheduler engine termination");
131135
Thread.currentThread().interrupt();
132136
}
133137
}

x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/DataFrame.java

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -100,6 +100,7 @@ public class DataFrame extends Plugin implements ActionPlugin, PersistentTaskPlu
100100
private final Settings settings;
101101
private final boolean transportClientMode;
102102
private final SetOnce<DataFrameTransformsConfigManager> dataFrameTransformsConfigManager = new SetOnce<>();
103+
private final SetOnce<SchedulerEngine> schedulerEngine = new SetOnce<>();
103104

104105
public DataFrame(Settings settings) {
105106
this.settings = settings;
@@ -201,12 +202,12 @@ public List<PersistentTasksExecutor<?>> getPersistentTasksExecutor(ClusterServic
201202
return emptyList();
202203
}
203204

204-
SchedulerEngine schedulerEngine = new SchedulerEngine(settings, Clock.systemUTC());
205+
schedulerEngine.set(new SchedulerEngine(settings, Clock.systemUTC()));
205206

206207
// the transforms config manager should have been created
207208
assert dataFrameTransformsConfigManager.get() != null;
208-
return Collections.singletonList(
209-
new DataFrameTransformPersistentTasksExecutor(client, dataFrameTransformsConfigManager.get(), schedulerEngine, threadPool));
209+
return Collections.singletonList(new DataFrameTransformPersistentTasksExecutor(client, dataFrameTransformsConfigManager.get(),
210+
schedulerEngine.get(), threadPool));
210211
}
211212

212213
@Override
@@ -223,4 +224,11 @@ public List<NamedXContentRegistry.Entry> getNamedXContent() {
223224
DataFrameTransformState::fromXContent)
224225
);
225226
}
227+
228+
@Override
229+
public void close() {
230+
if (schedulerEngine.get() != null) {
231+
schedulerEngine.get().stop();
232+
}
233+
}
226234
}

x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/transforms/DataFrameTransformPersistentTasksExecutor.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -68,4 +68,4 @@ protected AllocatedPersistentTask createTask(long id, String type, String action
6868
return new DataFrameTransformTask(id, type, action, parentTaskId, persistentTask.getParams(),
6969
(DataFrameTransformState) persistentTask.getState(), client, transformsConfigManager, schedulerEngine, threadPool, headers);
7070
}
71-
}
71+
}

x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/indexlifecycle/IndexLifecycleService.java

Lines changed: 27 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
import org.elasticsearch.cluster.metadata.IndexMetaData;
1919
import org.elasticsearch.cluster.service.ClusterService;
2020
import org.elasticsearch.common.Strings;
21+
import org.elasticsearch.common.component.Lifecycle.State;
2122
import org.elasticsearch.common.settings.Settings;
2223
import org.elasticsearch.common.unit.TimeValue;
2324
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
@@ -53,8 +54,6 @@ public class IndexLifecycleService
5354
private final PolicyStepsRegistry policyRegistry;
5455
private final IndexLifecycleRunner lifecycleRunner;
5556
private final Settings settings;
56-
private final ThreadPool threadPool;
57-
private Client client;
5857
private ClusterService clusterService;
5958
private LongSupplier nowSupplier;
6059
private SchedulerEngine.Job scheduledJob;
@@ -63,13 +62,11 @@ public IndexLifecycleService(Settings settings, Client client, ClusterService cl
6362
LongSupplier nowSupplier, NamedXContentRegistry xContentRegistry) {
6463
super();
6564
this.settings = settings;
66-
this.client = client;
6765
this.clusterService = clusterService;
6866
this.clock = clock;
6967
this.nowSupplier = nowSupplier;
7068
this.scheduledJob = null;
7169
this.policyRegistry = new PolicyStepsRegistry(xContentRegistry, client);
72-
this.threadPool = threadPool;
7370
this.lifecycleRunner = new IndexLifecycleRunner(policyRegistry, clusterService, threadPool, nowSupplier);
7471
this.pollInterval = LifecycleSettings.LIFECYCLE_POLL_INTERVAL_SETTING.get(settings);
7572
clusterService.addStateApplier(this);
@@ -158,14 +155,21 @@ SchedulerEngine.Job getScheduledJob() {
158155
return scheduledJob;
159156
}
160157

161-
private void maybeScheduleJob() {
158+
private synchronized void maybeScheduleJob() {
162159
if (this.isMaster) {
163160
if (scheduler.get() == null) {
164-
scheduler.set(new SchedulerEngine(settings, clock));
165-
scheduler.get().register(this);
161+
// don't create scheduler if the node is shutting down
162+
if (isClusterServiceStoppedOrClosed() == false) {
163+
scheduler.set(new SchedulerEngine(settings, clock));
164+
scheduler.get().register(this);
165+
}
166+
}
167+
168+
// scheduler could be null if the node might be shutting down
169+
if (scheduler.get() != null) {
170+
scheduledJob = new SchedulerEngine.Job(XPackField.INDEX_LIFECYCLE, new TimeValueSchedule(pollInterval));
171+
scheduler.get().add(scheduledJob);
166172
}
167-
scheduledJob = new SchedulerEngine.Job(XPackField.INDEX_LIFECYCLE, new TimeValueSchedule(pollInterval));
168-
scheduler.get().add(scheduledJob);
169173
}
170174
}
171175

@@ -254,7 +258,11 @@ void triggerPolicies(ClusterState clusterState, boolean fromClusterStateChange)
254258
}
255259

256260
@Override
257-
public void close() {
261+
public synchronized void close() {
262+
// this assertion is here to ensure that the check we use in maybeScheduleJob is accurate for detecting a shutdown in
263+
// progress, which is that the cluster service is stopped and closed at some point prior to closing plugins
264+
assert isClusterServiceStoppedOrClosed() : "close is called by closing the plugin, which is expected to happen after " +
265+
"the cluster service is stopped";
258266
SchedulerEngine engine = scheduler.get();
259267
if (engine != null) {
260268
engine.stop();
@@ -265,4 +273,13 @@ public void submitOperationModeUpdate(OperationMode mode) {
265273
clusterService.submitStateUpdateTask("ilm_operation_mode_update",
266274
new OperationModeUpdateTask(mode));
267275
}
276+
277+
/**
278+
* Method that checks if the lifecycle state of the cluster service is stopped or closed. This
279+
* enhances the readability of the code.
280+
*/
281+
private boolean isClusterServiceStoppedOrClosed() {
282+
final State state = clusterService.lifecycleState();
283+
return state == State.STOPPED || state == State.CLOSED;
284+
}
268285
}

x-pack/plugin/ilm/src/test/java/org/elasticsearch/xpack/indexlifecycle/IndexLifecycleServiceTests.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
import org.elasticsearch.cluster.node.DiscoveryNodes;
2020
import org.elasticsearch.cluster.service.ClusterService;
2121
import org.elasticsearch.common.collect.ImmutableOpenMap;
22+
import org.elasticsearch.common.component.Lifecycle.State;
2223
import org.elasticsearch.common.settings.ClusterSettings;
2324
import org.elasticsearch.common.settings.Settings;
2425
import org.elasticsearch.common.transport.TransportAddress;
@@ -91,6 +92,7 @@ public void prepareServices() {
9192
Settings settings = Settings.builder().put(LifecycleSettings.LIFECYCLE_POLL_INTERVAL, "1s").build();
9293
when(clusterService.getClusterSettings()).thenReturn(new ClusterSettings(settings,
9394
Collections.singleton(LifecycleSettings.LIFECYCLE_POLL_INTERVAL_SETTING)));
95+
when(clusterService.lifecycleState()).thenReturn(State.STARTED);
9496

9597
Client client = mock(Client.class);
9698
AdminClient adminClient = mock(AdminClient.class);
@@ -108,6 +110,7 @@ public void prepareServices() {
108110

109111
@After
110112
public void cleanup() {
113+
when(clusterService.lifecycleState()).thenReturn(randomFrom(State.STOPPED, State.CLOSED));
111114
indexLifecycleService.close();
112115
threadPool.shutdownNow();
113116
}

x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/rollup/Rollup.java

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
*/
66
package org.elasticsearch.xpack.rollup;
77

8+
import org.apache.lucene.util.SetOnce;
89
import org.elasticsearch.Version;
910
import org.elasticsearch.action.ActionRequest;
1011
import org.elasticsearch.action.ActionResponse;
@@ -104,6 +105,7 @@ public class Rollup extends Plugin implements ActionPlugin, PersistentTaskPlugin
104105
new HashSet<>(Arrays.asList("es-security-runas-user", "_xpack_security_authentication"));
105106

106107

108+
private final SetOnce<SchedulerEngine> schedulerEngine = new SetOnce<>();
107109
private final Settings settings;
108110
private final boolean enabled;
109111
private final boolean transportClientMode;
@@ -195,12 +197,19 @@ public List<PersistentTasksExecutor<?>> getPersistentTasksExecutor(ClusterServic
195197
return emptyList();
196198
}
197199

198-
SchedulerEngine schedulerEngine = new SchedulerEngine(settings, getClock());
199-
return Collections.singletonList(new RollupJobTask.RollupJobPersistentTasksExecutor(client, schedulerEngine, threadPool));
200+
schedulerEngine.set(new SchedulerEngine(settings, getClock()));
201+
return Collections.singletonList(new RollupJobTask.RollupJobPersistentTasksExecutor(client, schedulerEngine.get(), threadPool));
200202
}
201203

202204
// overridable by tests
203205
protected Clock getClock() {
204206
return Clock.systemUTC();
205207
}
208+
209+
@Override
210+
public void close() {
211+
if (schedulerEngine.get() != null) {
212+
schedulerEngine.get().stop();
213+
}
214+
}
206215
}

0 commit comments

Comments
 (0)