From 48737bf2f932a3fd8e9fb2f4ea401091a83bd1b1 Mon Sep 17 00:00:00 2001 From: Alexander Reelsen Date: Wed, 6 Jun 2018 14:50:52 +0200 Subject: [PATCH] Watcher: Fix check for currently executed watches The ack watch action has a check for currently executed watches, to make sure that currently running watches cannot be acknowledged. This check only checked on the local node for watches being executed, but should have checked the whole cluster using a WatcherStatsRequest, which is being switched to in this commit. --- .../actions/ack/TransportAckWatchAction.java | 111 +++++++++--------- .../ack/TransportAckWatchActionTests.java | 37 ++++-- 2 files changed, 86 insertions(+), 62 deletions(-) diff --git a/x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/transport/actions/ack/TransportAckWatchAction.java b/x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/transport/actions/ack/TransportAckWatchAction.java index 52c63cab69cab..e2c6d6af67658 100644 --- a/x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/transport/actions/ack/TransportAckWatchAction.java +++ b/x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/transport/actions/ack/TransportAckWatchAction.java @@ -27,13 +27,13 @@ import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportService; import org.elasticsearch.xpack.core.watcher.actions.ActionWrapper; -import org.elasticsearch.xpack.core.watcher.execution.WatchExecutionSnapshot; import org.elasticsearch.xpack.core.watcher.transport.actions.ack.AckWatchAction; import org.elasticsearch.xpack.core.watcher.transport.actions.ack.AckWatchRequest; import org.elasticsearch.xpack.core.watcher.transport.actions.ack.AckWatchResponse; +import org.elasticsearch.xpack.core.watcher.transport.actions.stats.WatcherStatsAction; +import org.elasticsearch.xpack.core.watcher.transport.actions.stats.WatcherStatsRequest; import org.elasticsearch.xpack.core.watcher.watch.Watch; import org.elasticsearch.xpack.core.watcher.watch.WatchField; -import org.elasticsearch.xpack.watcher.execution.ExecutionService; import org.elasticsearch.xpack.watcher.transport.actions.WatcherTransportAction; import org.elasticsearch.xpack.watcher.watch.WatchParser; import org.joda.time.DateTime; @@ -51,84 +51,87 @@ public class TransportAckWatchAction extends WatcherTransportAction listener) { - // if the watch to be acked is running currently, reject this request - List snapshots = executionService.currentExecutions(); - boolean isWatchRunning = snapshots.stream().anyMatch(s -> s.watchId().equals(request.getWatchId())); - if (isWatchRunning) { - listener.onFailure(new ElasticsearchStatusException("watch[{}] is running currently, cannot ack until finished", + WatcherStatsRequest watcherStatsRequest = new WatcherStatsRequest(); + watcherStatsRequest.includeCurrentWatches(true); + + executeAsyncWithOrigin(client, WATCHER_ORIGIN, WatcherStatsAction.INSTANCE, watcherStatsRequest, ActionListener.wrap(response -> { + boolean isWatchRunning = response.getNodes().stream() + .anyMatch(node -> node.getSnapshots().stream().anyMatch(snapshot -> snapshot.watchId().equals(request.getWatchId()))); + if (isWatchRunning) { + listener.onFailure(new ElasticsearchStatusException("watch[{}] is running currently, cannot ack until finished", RestStatus.CONFLICT, request.getWatchId())); - return; - } - - GetRequest getRequest = new GetRequest(Watch.INDEX, Watch.DOC_TYPE, request.getWatchId()) - .preference(Preference.LOCAL.type()).realtime(true); - - executeAsyncWithOrigin(client.threadPool().getThreadContext(), WATCHER_ORIGIN, getRequest, - ActionListener.wrap((response) -> { - if (response.isExists() == false) { - listener.onFailure(new ResourceNotFoundException("Watch with id [{}] does not exist", request.getWatchId())); - } else { - DateTime now = new DateTime(clock.millis(), UTC); - Watch watch = parser.parseWithSecrets(request.getWatchId(), true, response.getSourceAsBytesRef(), + } else { + GetRequest getRequest = new GetRequest(Watch.INDEX, Watch.DOC_TYPE, request.getWatchId()) + .preference(Preference.LOCAL.type()).realtime(true); + + executeAsyncWithOrigin(client.threadPool().getThreadContext(), WATCHER_ORIGIN, getRequest, + ActionListener.wrap(getResponse -> { + if (getResponse.isExists() == false) { + listener.onFailure(new ResourceNotFoundException("Watch with id [{}] does not exist", request.getWatchId())); + } else { + DateTime now = new DateTime(clock.millis(), UTC); + Watch watch = parser.parseWithSecrets(request.getWatchId(), true, getResponse.getSourceAsBytesRef(), now, XContentType.JSON); - watch.version(response.getVersion()); - watch.status().version(response.getVersion()); - String[] actionIds = request.getActionIds(); - if (actionIds == null || actionIds.length == 0) { - actionIds = new String[]{WatchField.ALL_ACTIONS_ID}; - } + watch.version(getResponse.getVersion()); + watch.status().version(getResponse.getVersion()); + String[] actionIds = request.getActionIds(); + if (actionIds == null || actionIds.length == 0) { + actionIds = new String[]{WatchField.ALL_ACTIONS_ID}; + } - // exit early in case nothing changes - boolean isChanged = watch.ack(now, actionIds); - if (isChanged == false) { - listener.onResponse(new AckWatchResponse(watch.status())); - return; - } + // exit early in case nothing changes + boolean isChanged = watch.ack(now, actionIds); + if (isChanged == false) { + listener.onResponse(new AckWatchResponse(watch.status())); + return; + } - UpdateRequest updateRequest = new UpdateRequest(Watch.INDEX, Watch.DOC_TYPE, request.getWatchId()); - // this may reject this action, but prevents concurrent updates from a watch execution - updateRequest.version(response.getVersion()); - updateRequest.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE); - XContentBuilder builder = jsonBuilder(); - builder.startObject() + UpdateRequest updateRequest = new UpdateRequest(Watch.INDEX, Watch.DOC_TYPE, request.getWatchId()); + // this may reject this action, but prevents concurrent updates from a watch execution + updateRequest.version(getResponse.getVersion()); + updateRequest.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE); + XContentBuilder builder = jsonBuilder(); + builder.startObject() .startObject(WatchField.STATUS.getPreferredName()) .startObject("actions"); - List actionIdsAsList = Arrays.asList(actionIds); - boolean updateAll = actionIdsAsList.contains("_all"); - for (ActionWrapper actionWrapper : watch.actions()) { - if (updateAll || actionIdsAsList.contains(actionWrapper.id())) { - builder.startObject(actionWrapper.id()) + List actionIdsAsList = Arrays.asList(actionIds); + boolean updateAll = actionIdsAsList.contains("_all"); + for (ActionWrapper actionWrapper : watch.actions()) { + if (updateAll || actionIdsAsList.contains(actionWrapper.id())) { + builder.startObject(actionWrapper.id()) .field("ack", watch.status().actionStatus(actionWrapper.id()).ackStatus(), ToXContent.EMPTY_PARAMS) .endObject(); + } } - } - builder.endObject().endObject().endObject(); - updateRequest.doc(builder); + builder.endObject().endObject().endObject(); + updateRequest.doc(builder); - executeAsyncWithOrigin(client.threadPool().getThreadContext(), WATCHER_ORIGIN, updateRequest, + executeAsyncWithOrigin(client.threadPool().getThreadContext(), WATCHER_ORIGIN, updateRequest, ActionListener.wrap( - (updateResponse) -> listener.onResponse(new AckWatchResponse(watch.status())), - listener::onFailure), client::update); - } - }, listener::onFailure), client::get); + (updateResponse) -> listener.onResponse(new AckWatchResponse(watch.status())), + listener::onFailure), client::update); + } + }, listener::onFailure), client::get); + + } + + }, listener::onFailure)); } } diff --git a/x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/transport/actions/ack/TransportAckWatchActionTests.java b/x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/transport/actions/ack/TransportAckWatchActionTests.java index d1b04ea55660d..df2017591c377 100644 --- a/x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/transport/actions/ack/TransportAckWatchActionTests.java +++ b/x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/transport/actions/ack/TransportAckWatchActionTests.java @@ -6,12 +6,16 @@ package org.elasticsearch.xpack.watcher.transport.actions.ack; import org.elasticsearch.ElasticsearchException; +import org.elasticsearch.Version; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.get.GetResponse; import org.elasticsearch.action.support.ActionFilters; +import org.elasticsearch.action.support.ContextPreservingActionListener; import org.elasticsearch.action.support.PlainActionFuture; import org.elasticsearch.client.Client; +import org.elasticsearch.cluster.ClusterName; import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; +import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.common.bytes.BytesArray; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.util.concurrent.ThreadContext; @@ -21,11 +25,13 @@ import org.elasticsearch.test.ESTestCase; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportService; +import org.elasticsearch.xpack.core.watcher.WatcherMetaData; import org.elasticsearch.xpack.core.watcher.execution.WatchExecutionSnapshot; import org.elasticsearch.xpack.core.watcher.transport.actions.ack.AckWatchRequest; import org.elasticsearch.xpack.core.watcher.transport.actions.ack.AckWatchResponse; +import org.elasticsearch.xpack.core.watcher.transport.actions.stats.WatcherStatsAction; +import org.elasticsearch.xpack.core.watcher.transport.actions.stats.WatcherStatsResponse; import org.elasticsearch.xpack.core.watcher.watch.Watch; -import org.elasticsearch.xpack.watcher.execution.ExecutionService; import org.elasticsearch.xpack.watcher.watch.WatchParser; import org.junit.Before; @@ -35,6 +41,7 @@ import static org.hamcrest.Matchers.is; import static org.mockito.Matchers.anyObject; +import static org.mockito.Matchers.eq; import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; @@ -42,7 +49,6 @@ public class TransportAckWatchActionTests extends ESTestCase { private TransportAckWatchAction action; - private ExecutionService executionService; private Client client; @Before @@ -52,12 +58,11 @@ public void setupAction() { ThreadContext threadContext = new ThreadContext(Settings.EMPTY); when(threadPool.getThreadContext()).thenReturn(threadContext); WatchParser watchParser = mock(WatchParser.class); - executionService = mock(ExecutionService.class); client = mock(Client.class); when(client.threadPool()).thenReturn(threadPool); action = new TransportAckWatchAction(Settings.EMPTY, transportService, threadPool, new ActionFilters(Collections.emptySet()), new IndexNameExpressionResolver(Settings.EMPTY), - Clock.systemUTC(), new XPackLicenseState(Settings.EMPTY), watchParser, executionService, client); + Clock.systemUTC(), new XPackLicenseState(Settings.EMPTY), watchParser, client); } public void testWatchNotFound() { @@ -69,6 +74,13 @@ public void testWatchNotFound() { return null; }).when(client).get(anyObject(), anyObject()); + doAnswer(invocation -> { + ContextPreservingActionListener listener = (ContextPreservingActionListener) invocation.getArguments()[2]; + listener.onResponse(new WatcherStatsResponse(new ClusterName("clusterName"), new WatcherMetaData(false), + Collections.emptyList(), Collections.emptyList())); + return null; + }).when(client).execute(eq(WatcherStatsAction.INSTANCE), anyObject(), anyObject()); + AckWatchRequest ackWatchRequest = new AckWatchRequest(watchId); PlainActionFuture listener = PlainActionFuture.newFuture(); action.doExecute(ackWatchRequest, listener); @@ -80,9 +92,18 @@ public void testWatchNotFound() { public void testThatWatchCannotBeAckedWhileRunning() { String watchId = "my_watch_id"; - WatchExecutionSnapshot snapshot = mock(WatchExecutionSnapshot.class); - when(snapshot.watchId()).thenReturn(watchId); - when(executionService.currentExecutions()).thenReturn(Collections.singletonList(snapshot)); + + doAnswer(invocation -> { + ContextPreservingActionListener listener = (ContextPreservingActionListener) invocation.getArguments()[2]; + DiscoveryNode discoveryNode = new DiscoveryNode("node_2", buildNewFakeTransportAddress(), Version.CURRENT); + WatcherStatsResponse.Node node = new WatcherStatsResponse.Node(discoveryNode); + WatchExecutionSnapshot snapshot = mock(WatchExecutionSnapshot.class); + when(snapshot.watchId()).thenReturn(watchId); + node.setSnapshots(Collections.singletonList(snapshot)); + listener.onResponse(new WatcherStatsResponse(new ClusterName("clusterName"), + new WatcherMetaData(false), Collections.singletonList(node), Collections.emptyList())); + return null; + }).when(client).execute(eq(WatcherStatsAction.INSTANCE), anyObject(), anyObject()); AckWatchRequest ackWatchRequest = new AckWatchRequest(watchId); PlainActionFuture listener = PlainActionFuture.newFuture(); @@ -93,4 +114,4 @@ public void testThatWatchCannotBeAckedWhileRunning() { assertThat(e.getMessage(), is("watch[my_watch_id] is running currently, cannot ack until finished")); assertThat(e.status(), is(RestStatus.CONFLICT)); } -} \ No newline at end of file +}