Skip to content

Watcher: Make start/stop cycle more predictable and synchronous #30118

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 8 commits into from
May 3, 2018
Original file line number Diff line number Diff line change
Expand Up @@ -97,17 +97,26 @@ private void waitForWatcher() throws Exception {
// ensure watcher is started, so that a test can stop watcher and everything still works fine
if (isWatcherTest()) {
assertBusy(() -> {
try {
ClientYamlTestResponse response =
getAdminExecutionContext().callApi("xpack.watcher.stats", emptyMap(), emptyList(), emptyMap());
String state = (String) response.evaluate("stats.0.watcher_state");
if ("started".equals(state) == false) {
getAdminExecutionContext().callApi("xpack.watcher.start", emptyMap(), emptyList(), emptyMap());
}
// assertion required to exit the assertBusy lambda
assertThat(state, is("started"));
} catch (IOException e) {
throw new AssertionError(e);
ClientYamlTestResponse response =
getAdminExecutionContext().callApi("xpack.watcher.stats", emptyMap(), emptyList(), emptyMap());
String state = (String) response.evaluate("stats.0.watcher_state");

switch (state) {
case "stopped":
ClientYamlTestResponse startResponse =
getAdminExecutionContext().callApi("xpack.watcher.start", emptyMap(), emptyList(), emptyMap());
boolean isAcknowledged = (boolean) startResponse.evaluate("acknowledged");
assertThat(isAcknowledged, is(true));
break;
case "stopping":
throw new AssertionError("waiting until stopping state reached stopped state to start again");
case "starting":
throw new AssertionError("waiting until starting state reached started state");
case "started":
// all good here, we are done
break;
default:
throw new AssertionError("unknown state[" + state + "]");
}
});
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -351,7 +351,7 @@ public Collection<Object> createComponents(Client client, ClusterService cluster
final WatchParser watchParser = new WatchParser(settings, triggerService, registry, inputRegistry, cryptoService, getClock());

final ExecutionService executionService = new ExecutionService(settings, historyStore, triggeredWatchStore, watchExecutor,
getClock(), watchParser, clusterService, client);
getClock(), watchParser, clusterService, client, threadPool.generic());

final Consumer<Iterable<TriggerEvent>> triggerEngineListener = getTriggerEngineListener(executionService);
triggerService.register(triggerEngineListener);
Expand All @@ -360,7 +360,7 @@ public Collection<Object> createComponents(Client client, ClusterService cluster
watchParser, client);

final WatcherLifeCycleService watcherLifeCycleService =
new WatcherLifeCycleService(settings, threadPool, clusterService, watcherService);
new WatcherLifeCycleService(settings, clusterService, watcherService);

listener = new WatcherIndexingListener(settings, watchParser, getClock(), triggerService);
clusterService.addListener(listener);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -123,10 +123,10 @@ public Engine.Index preIndex(ShardId shardId, Engine.Index operation) {
boolean shouldBeTriggered = shardAllocationConfiguration.shouldBeTriggered(watch.id());
if (shouldBeTriggered) {
if (watch.status().state().isActive()) {
logger.debug("adding watch [{}] to trigger", watch.id());
logger.debug("adding watch [{}] to trigger service", watch.id());
triggerService.add(watch);
} else {
logger.debug("removing watch [{}] to trigger", watch.id());
logger.debug("removing watch [{}] to trigger service", watch.id());
triggerService.remove(watch.id());
}
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,29 +21,19 @@
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Setting.Property;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.gateway.GatewayService;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.xpack.core.upgrade.UpgradeField;
import org.elasticsearch.xpack.core.watcher.WatcherMetaData;
import org.elasticsearch.xpack.core.watcher.WatcherState;
import org.elasticsearch.xpack.core.watcher.execution.TriggeredWatchStoreField;
import org.elasticsearch.xpack.core.watcher.watch.Watch;
import org.elasticsearch.xpack.watcher.support.WatcherIndexTemplateRegistry;
import org.elasticsearch.xpack.watcher.watch.WatchStoreUtils;

import java.util.Collections;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import java.util.stream.Collectors;

import static org.elasticsearch.cluster.routing.ShardRoutingState.RELOCATING;
import static org.elasticsearch.cluster.routing.ShardRoutingState.STARTED;
import static org.elasticsearch.common.util.concurrent.EsExecutors.daemonThreadFactory;

public class WatcherLifeCycleService extends AbstractComponent implements ClusterStateListener {

Expand All @@ -54,30 +44,14 @@ public class WatcherLifeCycleService extends AbstractComponent implements Cluste
public static final Setting<Boolean> SETTING_REQUIRE_MANUAL_START =
Setting.boolSetting("xpack.watcher.require_manual_start", false, Property.NodeScope);

private static final String LIFECYCLE_THREADPOOL_NAME = "watcher-lifecycle";

private final WatcherService watcherService;
private final ExecutorService executor;
private AtomicReference<List<String>> previousAllocationIds = new AtomicReference<>(Collections.emptyList());
private volatile boolean shutDown = false; // indicates that the node has been shutdown and we should never start watcher after this.
private final AtomicReference<WatcherState> state = new AtomicReference<>(WatcherState.STARTED);
private final AtomicReference<List<String>> previousAllocationIds = new AtomicReference<>(Collections.emptyList());
private final boolean requireManualStart;
private volatile boolean shutDown = false; // indicates that the node has been shutdown and we should never start watcher after this.
private volatile WatcherService watcherService;

WatcherLifeCycleService(Settings settings, ThreadPool threadPool, ClusterService clusterService,
WatcherService watcherService) {
// use a single thread executor so that lifecycle changes are handled in the order they
// are submitted in
this(settings, clusterService, watcherService, EsExecutors.newFixed(
LIFECYCLE_THREADPOOL_NAME,
1,
1000,
daemonThreadFactory(settings, LIFECYCLE_THREADPOOL_NAME),
threadPool.getThreadContext()));
}

WatcherLifeCycleService(Settings settings, ClusterService clusterService,
WatcherService watcherService, ExecutorService executorService) {
WatcherLifeCycleService(Settings settings, ClusterService clusterService, WatcherService watcherService) {
super(settings);
this.executor = executorService;
this.watcherService = watcherService;
this.requireManualStart = SETTING_REQUIRE_MANUAL_START.get(settings);
clusterService.addListener(this);
Expand All @@ -91,58 +65,12 @@ public void beforeStop() {
});
}

public synchronized void stop(String reason) {
watcherService.stop(reason);
}

synchronized void shutDown() {
this.state.set(WatcherState.STOPPING);
shutDown = true;
stop("shutdown initiated");
stopExecutor();
}

void stopExecutor() {
ThreadPool.terminate(executor, 10L, TimeUnit.SECONDS);
}

private synchronized void start(ClusterState state) {
if (shutDown) {
return;
}
final WatcherState watcherState = watcherService.state();
if (watcherState != WatcherState.STOPPED) {
logger.debug("not starting watcher. watcher can only start if its current state is [{}], but its current state now is [{}]",
WatcherState.STOPPED, watcherState);
return;
}

// If we start from a cluster state update we need to check if previously we stopped manually
// otherwise Watcher would start upon the next cluster state update while the user instructed Watcher to not run
WatcherMetaData watcherMetaData = state.getMetaData().custom(WatcherMetaData.TYPE);
if (watcherMetaData != null && watcherMetaData.manuallyStopped()) {
logger.debug("not starting watcher. watcher was stopped manually and therefore cannot be auto-started");
return;
}

// ensure that templates are existing before starting watcher
// the watcher index template registry is independent from watcher being started or stopped
if (WatcherIndexTemplateRegistry.validate(state) == false) {
logger.debug("not starting watcher, watcher templates are missing in the cluster state");
return;
}

if (watcherService.validate(state)) {
logger.trace("starting... (based on cluster state version [{}])", state.getVersion());
try {
// we need to populate the allocation ids before the next cluster state listener comes in
checkAndSetAllocationIds(state, false);
watcherService.start(state);
} catch (Exception e) {
logger.warn("failed to start watcher. please wait for the cluster to become ready or try to start Watcher manually", e);
}
} else {
logger.debug("not starting watcher. because the cluster isn't ready yet to run watcher");
}
clearAllocationIds();
watcherService.shutDown();
this.state.set(WatcherState.STOPPED);
}

/**
Expand All @@ -169,106 +97,92 @@ public void clusterChanged(ClusterChangedEvent event) {
}

if (Strings.isNullOrEmpty(event.state().nodes().getMasterNodeId())) {
clearAllocationIds();
executor.execute(() -> this.stop("no master node"));
pauseExecution("no master node");
return;
}

if (event.state().getBlocks().hasGlobalBlock(ClusterBlockLevel.WRITE)) {
clearAllocationIds();
executor.execute(() -> this.stop("write level cluster block"));
pauseExecution("write level cluster block");
return;
}

if (isWatcherStoppedManually(event.state())) {
clearAllocationIds();
executor.execute(() -> this.stop("watcher manually marked to shutdown by cluster state update"));
} else {
final WatcherState watcherState = watcherService.state();
if (watcherState == WatcherState.STARTED && event.state().nodes().getLocalNode().isDataNode()) {
checkAndSetAllocationIds(event.state(), true);
} else if (watcherState != WatcherState.STARTED && watcherState != WatcherState.STARTING) {
IndexMetaData watcherIndexMetaData = WatchStoreUtils.getConcreteIndex(Watch.INDEX, event.state().metaData());
IndexMetaData triggeredWatchesIndexMetaData = WatchStoreUtils.getConcreteIndex(TriggeredWatchStoreField.INDEX_NAME,
event.state().metaData());
boolean isIndexInternalFormatWatchIndex = watcherIndexMetaData == null ||
UpgradeField.checkInternalIndexFormat(watcherIndexMetaData);
boolean isIndexInternalFormatTriggeredWatchIndex = triggeredWatchesIndexMetaData == null ||
UpgradeField.checkInternalIndexFormat(triggeredWatchesIndexMetaData);
if (isIndexInternalFormatTriggeredWatchIndex && isIndexInternalFormatWatchIndex) {
checkAndSetAllocationIds(event.state(), false);
executor.execute(() -> start(event.state()));
} else {
logger.warn("not starting watcher, upgrade API run required: .watches[{}], .triggered_watches[{}]",
isIndexInternalFormatWatchIndex, isIndexInternalFormatTriggeredWatchIndex);
}
}
boolean isWatcherStoppedManually = isWatcherStoppedManually(event.state());
// if this is not a data node, we need to start it ourselves possibly
if (event.state().nodes().getLocalNode().isDataNode() == false &&
isWatcherStoppedManually == false && this.state.get() == WatcherState.STOPPED) {
watcherService.start(event.state());
this.state.set(WatcherState.STARTED);
return;
}
}

/**
* check if watcher has been stopped manually via the stop API
*/
private boolean isWatcherStoppedManually(ClusterState state) {
WatcherMetaData watcherMetaData = state.getMetaData().custom(WatcherMetaData.TYPE);
return watcherMetaData != null && watcherMetaData.manuallyStopped();
}

/**
* check and optionally set the current allocation ids
*
* @param state the current cluster state
* @param callWatcherService should the watcher service be called for starting/stopping/reloading or should this be treated as a
* dryrun so that the caller is responsible for this
*/
private void checkAndSetAllocationIds(ClusterState state, boolean callWatcherService) {
IndexMetaData watcherIndexMetaData = WatchStoreUtils.getConcreteIndex(Watch.INDEX, state.metaData());
if (watcherIndexMetaData == null) {
if (clearAllocationIds() && callWatcherService) {
executor.execute(wrapWatcherService(() -> watcherService.pauseExecution("no watcher index found"),
e -> logger.error("error pausing watch execution", e)));
if (isWatcherStoppedManually) {
if (this.state.get() == WatcherState.STARTED) {
clearAllocationIds();
watcherService.stop("watcher manually marked to shutdown by cluster state update");
this.state.set(WatcherState.STOPPED);
}
return;
}

DiscoveryNode localNode = state.nodes().getLocalNode();
RoutingNode routingNode = state.getRoutingNodes().node(localNode.getId());
// this can happen if the node does not hold any data
DiscoveryNode localNode = event.state().nodes().getLocalNode();
RoutingNode routingNode = event.state().getRoutingNodes().node(localNode.getId());
if (routingNode == null) {
if (clearAllocationIds() && callWatcherService) {
executor.execute(wrapWatcherService(
() -> watcherService.pauseExecution("no routing node for local node found, network issue?"),
e -> logger.error("error pausing watch execution", e)));
}
pauseExecution("routing node in cluster state undefined. network issue?");
return;
}

IndexMetaData watcherIndexMetaData = WatchStoreUtils.getConcreteIndex(Watch.INDEX, event.state().metaData());
if (watcherIndexMetaData == null) {
pauseExecution("no watcher index found");
return;
}

String watchIndex = watcherIndexMetaData.getIndex().getName();
List<ShardRouting> localShards = routingNode.shardsWithState(watchIndex, RELOCATING, STARTED);
// no local shards, empty out watcher and dont waste resources!
if (localShards.isEmpty()) {
if (clearAllocationIds() && callWatcherService) {
executor.execute(wrapWatcherService(() -> watcherService.pauseExecution("no local watcher shards found"),
e -> logger.error("error pausing watch execution", e)));
}
pauseExecution("no local watcher shards found");
return;
}

List<String> currentAllocationIds = localShards.stream()
.map(ShardRouting::allocationId)
.map(AllocationId::getId)
.collect(Collectors.toList());
Collections.sort(currentAllocationIds);
.map(ShardRouting::allocationId)
.map(AllocationId::getId)
.sorted()
.collect(Collectors.toList());

if (previousAllocationIds.get().equals(currentAllocationIds) == false) {
previousAllocationIds.set(Collections.unmodifiableList(currentAllocationIds));
if (callWatcherService) {
executor.execute(wrapWatcherService(() -> watcherService.reload(state, "new local watcher shard allocation ids"),
e -> logger.error("error reloading watcher", e)));
if (watcherService.validate(event.state())) {
previousAllocationIds.set(Collections.unmodifiableList(currentAllocationIds));
if (state.get() == WatcherState.STARTED) {
watcherService.reload(event.state(), "new local watcher shard allocation ids");
} else if (state.get() == WatcherState.STOPPED) {
watcherService.start(event.state());
this.state.set(WatcherState.STARTED);
}
} else {
clearAllocationIds();
this.state.set(WatcherState.STOPPED);
}
}
}

private void pauseExecution(String reason) {
if (clearAllocationIds()) {
watcherService.pauseExecution(reason);
}
this.state.set(WatcherState.STARTED);
}

/**
* check if watcher has been stopped manually via the stop API
*/
private boolean isWatcherStoppedManually(ClusterState state) {
WatcherMetaData watcherMetaData = state.getMetaData().custom(WatcherMetaData.TYPE);
return watcherMetaData != null && watcherMetaData.manuallyStopped();
}

/**
/**
* clear out current allocation ids if not already happened
* @return true, if existing allocation ids were cleaned out, false otherwise
Expand All @@ -283,26 +197,7 @@ List<String> allocationIds() {
return previousAllocationIds.get();
}

/**
* Wraps an abstract runnable to easier supply onFailure and doRun methods via lambdas
* This ensures that the uncaught exception handler in the executing threadpool does not get called
*
* @param run The code to be executed in the runnable
* @param exceptionConsumer The exception handling code to be executed, if the runnable fails
* @return The AbstractRunnable instance to pass to the executor
*/
private static AbstractRunnable wrapWatcherService(Runnable run, Consumer<Exception> exceptionConsumer) {

return new AbstractRunnable() {
@Override
public void onFailure(Exception e) {
exceptionConsumer.accept(e);
}

@Override
protected void doRun() throws Exception {
run.run();
}
};
public WatcherState getState() {
return state.get();
}
}
Loading