Skip to content

Fixed missed stopping of SchedulerEngine #39193

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 7 commits into from
Feb 21, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -126,8 +126,12 @@ public void start(Collection<Job> jobs) {
public void stop() {
scheduler.shutdownNow();
try {
scheduler.awaitTermination(5, TimeUnit.SECONDS);
final boolean terminated = scheduler.awaitTermination(5L, TimeUnit.SECONDS);
if (terminated == false) {
logger.warn("scheduler engine was not terminated after waiting 5s");
}
} catch (InterruptedException e) {
logger.warn("interrupted while waiting for scheduler engine termination");
Thread.currentThread().interrupt();
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,7 @@ public class DataFrame extends Plugin implements ActionPlugin, PersistentTaskPlu
private final Settings settings;
private final boolean transportClientMode;
private final SetOnce<DataFrameTransformsConfigManager> dataFrameTransformsConfigManager = new SetOnce<>();
private final SetOnce<SchedulerEngine> schedulerEngine = new SetOnce<>();

public DataFrame(Settings settings) {
this.settings = settings;
Expand Down Expand Up @@ -201,12 +202,12 @@ public List<PersistentTasksExecutor<?>> getPersistentTasksExecutor(ClusterServic
return emptyList();
}

SchedulerEngine schedulerEngine = new SchedulerEngine(settings, Clock.systemUTC());
schedulerEngine.set(new SchedulerEngine(settings, Clock.systemUTC()));

// the transforms config manager should have been created
assert dataFrameTransformsConfigManager.get() != null;
return Collections.singletonList(
new DataFrameTransformPersistentTasksExecutor(client, dataFrameTransformsConfigManager.get(), schedulerEngine, threadPool));
return Collections.singletonList(new DataFrameTransformPersistentTasksExecutor(client, dataFrameTransformsConfigManager.get(),
schedulerEngine.get(), threadPool));
}

@Override
Expand All @@ -223,4 +224,11 @@ public List<NamedXContentRegistry.Entry> getNamedXContent() {
DataFrameTransformState::fromXContent)
);
}

@Override
public void close() {
if (schedulerEngine.get() != null) {
schedulerEngine.get().stop();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -68,4 +68,4 @@ protected AllocatedPersistentTask createTask(long id, String type, String action
return new DataFrameTransformTask(id, type, action, parentTaskId, persistentTask.getParams(),
(DataFrameTransformState) persistentTask.getState(), client, transformsConfigManager, schedulerEngine, threadPool, headers);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.component.Lifecycle.State;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
Expand Down Expand Up @@ -53,8 +54,6 @@ public class IndexLifecycleService
private final PolicyStepsRegistry policyRegistry;
private final IndexLifecycleRunner lifecycleRunner;
private final Settings settings;
private final ThreadPool threadPool;
private Client client;
private ClusterService clusterService;
private LongSupplier nowSupplier;
private SchedulerEngine.Job scheduledJob;
Expand All @@ -63,13 +62,11 @@ public IndexLifecycleService(Settings settings, Client client, ClusterService cl
LongSupplier nowSupplier, NamedXContentRegistry xContentRegistry) {
super();
this.settings = settings;
this.client = client;
this.clusterService = clusterService;
this.clock = clock;
this.nowSupplier = nowSupplier;
this.scheduledJob = null;
this.policyRegistry = new PolicyStepsRegistry(xContentRegistry, client);
this.threadPool = threadPool;
this.lifecycleRunner = new IndexLifecycleRunner(policyRegistry, clusterService, threadPool, nowSupplier);
this.pollInterval = LifecycleSettings.LIFECYCLE_POLL_INTERVAL_SETTING.get(settings);
clusterService.addStateApplier(this);
Expand Down Expand Up @@ -158,14 +155,21 @@ SchedulerEngine.Job getScheduledJob() {
return scheduledJob;
}

private void maybeScheduleJob() {
private synchronized void maybeScheduleJob() {
if (this.isMaster) {
if (scheduler.get() == null) {
scheduler.set(new SchedulerEngine(settings, clock));
scheduler.get().register(this);
// don't create scheduler if the node is shutting down
if (isClusterServiceStoppedOrClosed() == false) {
scheduler.set(new SchedulerEngine(settings, clock));
scheduler.get().register(this);
}
}

// scheduler could be null if the node might be shutting down
if (scheduler.get() != null) {
scheduledJob = new SchedulerEngine.Job(XPackField.INDEX_LIFECYCLE, new TimeValueSchedule(pollInterval));
scheduler.get().add(scheduledJob);
}
scheduledJob = new SchedulerEngine.Job(XPackField.INDEX_LIFECYCLE, new TimeValueSchedule(pollInterval));
scheduler.get().add(scheduledJob);
}
}

Expand Down Expand Up @@ -254,7 +258,11 @@ void triggerPolicies(ClusterState clusterState, boolean fromClusterStateChange)
}

@Override
public void close() {
public synchronized void close() {
// this assertion is here to ensure that the check we use in maybeScheduleJob is accurate for detecting a shutdown in
// progress, which is that the cluster service is stopped and closed at some point prior to closing plugins
assert isClusterServiceStoppedOrClosed() : "close is called by closing the plugin, which is expected to happen after " +
"the cluster service is stopped";
SchedulerEngine engine = scheduler.get();
if (engine != null) {
engine.stop();
Expand All @@ -265,4 +273,13 @@ public void submitOperationModeUpdate(OperationMode mode) {
clusterService.submitStateUpdateTask("ilm_operation_mode_update",
new OperationModeUpdateTask(mode));
}

/**
* Method that checks if the lifecycle state of the cluster service is stopped or closed. This
* enhances the readability of the code.
*/
private boolean isClusterServiceStoppedOrClosed() {
final State state = clusterService.lifecycleState();
return state == State.STOPPED || state == State.CLOSED;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.collect.ImmutableOpenMap;
import org.elasticsearch.common.component.Lifecycle.State;
import org.elasticsearch.common.settings.ClusterSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.TransportAddress;
Expand Down Expand Up @@ -91,6 +92,7 @@ public void prepareServices() {
Settings settings = Settings.builder().put(LifecycleSettings.LIFECYCLE_POLL_INTERVAL, "1s").build();
when(clusterService.getClusterSettings()).thenReturn(new ClusterSettings(settings,
Collections.singleton(LifecycleSettings.LIFECYCLE_POLL_INTERVAL_SETTING)));
when(clusterService.lifecycleState()).thenReturn(State.STARTED);

Client client = mock(Client.class);
AdminClient adminClient = mock(AdminClient.class);
Expand All @@ -108,6 +110,7 @@ public void prepareServices() {

@After
public void cleanup() {
when(clusterService.lifecycleState()).thenReturn(randomFrom(State.STOPPED, State.CLOSED));
indexLifecycleService.close();
threadPool.shutdownNow();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
*/
package org.elasticsearch.xpack.rollup;

import org.apache.lucene.util.SetOnce;
import org.elasticsearch.Version;
import org.elasticsearch.action.ActionRequest;
import org.elasticsearch.action.ActionResponse;
Expand Down Expand Up @@ -104,6 +105,7 @@ public class Rollup extends Plugin implements ActionPlugin, PersistentTaskPlugin
new HashSet<>(Arrays.asList("es-security-runas-user", "_xpack_security_authentication"));


private final SetOnce<SchedulerEngine> schedulerEngine = new SetOnce<>();
private final Settings settings;
private final boolean enabled;
private final boolean transportClientMode;
Expand Down Expand Up @@ -195,12 +197,19 @@ public List<PersistentTasksExecutor<?>> getPersistentTasksExecutor(ClusterServic
return emptyList();
}

SchedulerEngine schedulerEngine = new SchedulerEngine(settings, getClock());
return Collections.singletonList(new RollupJobTask.RollupJobPersistentTasksExecutor(client, schedulerEngine, threadPool));
schedulerEngine.set(new SchedulerEngine(settings, getClock()));
return Collections.singletonList(new RollupJobTask.RollupJobPersistentTasksExecutor(client, schedulerEngine.get(), threadPool));
}

// overridable by tests
protected Clock getClock() {
return Clock.systemUTC();
}

@Override
public void close() {
if (schedulerEngine.get() != null) {
schedulerEngine.get().stop();
}
}
}