Skip to content

Commit 0a2ef59

Browse files
authored
Watcher: Fix check for currently executed watches (#31137)
The ack watch action has a check for currently executed watches, to make sure that currently running watches cannot be acknowledged. This check only checked on the coordinating node for watches being executed, but should have checked the whole cluster using a WatcherStatsRequest, which is being switched to in this commit.
1 parent 4328470 commit 0a2ef59

File tree

2 files changed

+86
-62
lines changed

2 files changed

+86
-62
lines changed

x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/transport/actions/ack/TransportAckWatchAction.java

+57-54
Original file line numberDiff line numberDiff line change
@@ -25,13 +25,13 @@
2525
import org.elasticsearch.rest.RestStatus;
2626
import org.elasticsearch.transport.TransportService;
2727
import org.elasticsearch.xpack.core.watcher.actions.ActionWrapper;
28-
import org.elasticsearch.xpack.core.watcher.execution.WatchExecutionSnapshot;
2928
import org.elasticsearch.xpack.core.watcher.transport.actions.ack.AckWatchAction;
3029
import org.elasticsearch.xpack.core.watcher.transport.actions.ack.AckWatchRequest;
3130
import org.elasticsearch.xpack.core.watcher.transport.actions.ack.AckWatchResponse;
31+
import org.elasticsearch.xpack.core.watcher.transport.actions.stats.WatcherStatsAction;
32+
import org.elasticsearch.xpack.core.watcher.transport.actions.stats.WatcherStatsRequest;
3233
import org.elasticsearch.xpack.core.watcher.watch.Watch;
3334
import org.elasticsearch.xpack.core.watcher.watch.WatchField;
34-
import org.elasticsearch.xpack.watcher.execution.ExecutionService;
3535
import org.elasticsearch.xpack.watcher.transport.actions.WatcherTransportAction;
3636
import org.elasticsearch.xpack.watcher.watch.WatchParser;
3737
import org.joda.time.DateTime;
@@ -49,83 +49,86 @@ public class TransportAckWatchAction extends WatcherTransportAction<AckWatchRequ
4949

5050
private final Clock clock;
5151
private final WatchParser parser;
52-
private ExecutionService executionService;
5352
private final Client client;
5453

5554
@Inject
5655
public TransportAckWatchAction(Settings settings, TransportService transportService, ActionFilters actionFilters,
57-
Clock clock, XPackLicenseState licenseState, WatchParser parser, ExecutionService executionService,
56+
Clock clock, XPackLicenseState licenseState, WatchParser parser,
5857
Client client) {
5958
super(settings, AckWatchAction.NAME, transportService, actionFilters, licenseState, AckWatchRequest::new);
6059
this.clock = clock;
6160
this.parser = parser;
62-
this.executionService = executionService;
6361
this.client = client;
6462
}
6563

6664
@Override
6765
protected void doExecute(AckWatchRequest request, ActionListener<AckWatchResponse> listener) {
68-
// if the watch to be acked is running currently, reject this request
69-
List<WatchExecutionSnapshot> snapshots = executionService.currentExecutions();
70-
boolean isWatchRunning = snapshots.stream().anyMatch(s -> s.watchId().equals(request.getWatchId()));
71-
if (isWatchRunning) {
72-
listener.onFailure(new ElasticsearchStatusException("watch[{}] is running currently, cannot ack until finished",
66+
WatcherStatsRequest watcherStatsRequest = new WatcherStatsRequest();
67+
watcherStatsRequest.includeCurrentWatches(true);
68+
69+
executeAsyncWithOrigin(client, WATCHER_ORIGIN, WatcherStatsAction.INSTANCE, watcherStatsRequest, ActionListener.wrap(response -> {
70+
boolean isWatchRunning = response.getNodes().stream()
71+
.anyMatch(node -> node.getSnapshots().stream().anyMatch(snapshot -> snapshot.watchId().equals(request.getWatchId())));
72+
if (isWatchRunning) {
73+
listener.onFailure(new ElasticsearchStatusException("watch[{}] is running currently, cannot ack until finished",
7374
RestStatus.CONFLICT, request.getWatchId()));
74-
return;
75-
}
76-
77-
GetRequest getRequest = new GetRequest(Watch.INDEX, Watch.DOC_TYPE, request.getWatchId())
78-
.preference(Preference.LOCAL.type()).realtime(true);
79-
80-
executeAsyncWithOrigin(client.threadPool().getThreadContext(), WATCHER_ORIGIN, getRequest,
81-
ActionListener.<GetResponse>wrap((response) -> {
82-
if (response.isExists() == false) {
83-
listener.onFailure(new ResourceNotFoundException("Watch with id [{}] does not exist", request.getWatchId()));
84-
} else {
85-
DateTime now = new DateTime(clock.millis(), UTC);
86-
Watch watch = parser.parseWithSecrets(request.getWatchId(), true, response.getSourceAsBytesRef(),
75+
} else {
76+
GetRequest getRequest = new GetRequest(Watch.INDEX, Watch.DOC_TYPE, request.getWatchId())
77+
.preference(Preference.LOCAL.type()).realtime(true);
78+
79+
executeAsyncWithOrigin(client.threadPool().getThreadContext(), WATCHER_ORIGIN, getRequest,
80+
ActionListener.<GetResponse>wrap(getResponse -> {
81+
if (getResponse.isExists() == false) {
82+
listener.onFailure(new ResourceNotFoundException("Watch with id [{}] does not exist", request.getWatchId()));
83+
} else {
84+
DateTime now = new DateTime(clock.millis(), UTC);
85+
Watch watch = parser.parseWithSecrets(request.getWatchId(), true, getResponse.getSourceAsBytesRef(),
8786
now, XContentType.JSON);
88-
watch.version(response.getVersion());
89-
watch.status().version(response.getVersion());
90-
String[] actionIds = request.getActionIds();
91-
if (actionIds == null || actionIds.length == 0) {
92-
actionIds = new String[]{WatchField.ALL_ACTIONS_ID};
93-
}
87+
watch.version(getResponse.getVersion());
88+
watch.status().version(getResponse.getVersion());
89+
String[] actionIds = request.getActionIds();
90+
if (actionIds == null || actionIds.length == 0) {
91+
actionIds = new String[]{WatchField.ALL_ACTIONS_ID};
92+
}
9493

95-
// exit early in case nothing changes
96-
boolean isChanged = watch.ack(now, actionIds);
97-
if (isChanged == false) {
98-
listener.onResponse(new AckWatchResponse(watch.status()));
99-
return;
100-
}
94+
// exit early in case nothing changes
95+
boolean isChanged = watch.ack(now, actionIds);
96+
if (isChanged == false) {
97+
listener.onResponse(new AckWatchResponse(watch.status()));
98+
return;
99+
}
101100

102-
UpdateRequest updateRequest = new UpdateRequest(Watch.INDEX, Watch.DOC_TYPE, request.getWatchId());
103-
// this may reject this action, but prevents concurrent updates from a watch execution
104-
updateRequest.version(response.getVersion());
105-
updateRequest.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
106-
XContentBuilder builder = jsonBuilder();
107-
builder.startObject()
101+
UpdateRequest updateRequest = new UpdateRequest(Watch.INDEX, Watch.DOC_TYPE, request.getWatchId());
102+
// this may reject this action, but prevents concurrent updates from a watch execution
103+
updateRequest.version(getResponse.getVersion());
104+
updateRequest.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
105+
XContentBuilder builder = jsonBuilder();
106+
builder.startObject()
108107
.startObject(WatchField.STATUS.getPreferredName())
109108
.startObject("actions");
110109

111-
List<String> actionIdsAsList = Arrays.asList(actionIds);
112-
boolean updateAll = actionIdsAsList.contains("_all");
113-
for (ActionWrapper actionWrapper : watch.actions()) {
114-
if (updateAll || actionIdsAsList.contains(actionWrapper.id())) {
115-
builder.startObject(actionWrapper.id())
110+
List<String> actionIdsAsList = Arrays.asList(actionIds);
111+
boolean updateAll = actionIdsAsList.contains("_all");
112+
for (ActionWrapper actionWrapper : watch.actions()) {
113+
if (updateAll || actionIdsAsList.contains(actionWrapper.id())) {
114+
builder.startObject(actionWrapper.id())
116115
.field("ack", watch.status().actionStatus(actionWrapper.id()).ackStatus(), ToXContent.EMPTY_PARAMS)
117116
.endObject();
117+
}
118118
}
119-
}
120119

121-
builder.endObject().endObject().endObject();
122-
updateRequest.doc(builder);
120+
builder.endObject().endObject().endObject();
121+
updateRequest.doc(builder);
123122

124-
executeAsyncWithOrigin(client.threadPool().getThreadContext(), WATCHER_ORIGIN, updateRequest,
123+
executeAsyncWithOrigin(client.threadPool().getThreadContext(), WATCHER_ORIGIN, updateRequest,
125124
ActionListener.<UpdateResponse>wrap(
126-
(updateResponse) -> listener.onResponse(new AckWatchResponse(watch.status())),
127-
listener::onFailure), client::update);
128-
}
129-
}, listener::onFailure), client::get);
125+
(updateResponse) -> listener.onResponse(new AckWatchResponse(watch.status())),
126+
listener::onFailure), client::update);
127+
}
128+
}, listener::onFailure), client::get);
129+
130+
}
131+
132+
}, listener::onFailure));
130133
}
131134
}

x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/transport/actions/ack/TransportAckWatchActionTests.java

+29-8
Original file line numberDiff line numberDiff line change
@@ -6,11 +6,15 @@
66
package org.elasticsearch.xpack.watcher.transport.actions.ack;
77

88
import org.elasticsearch.ElasticsearchException;
9+
import org.elasticsearch.Version;
910
import org.elasticsearch.action.ActionListener;
1011
import org.elasticsearch.action.get.GetResponse;
1112
import org.elasticsearch.action.support.ActionFilters;
13+
import org.elasticsearch.action.support.ContextPreservingActionListener;
1214
import org.elasticsearch.action.support.PlainActionFuture;
1315
import org.elasticsearch.client.Client;
16+
import org.elasticsearch.cluster.ClusterName;
17+
import org.elasticsearch.cluster.node.DiscoveryNode;
1418
import org.elasticsearch.common.bytes.BytesArray;
1519
import org.elasticsearch.common.settings.Settings;
1620
import org.elasticsearch.common.util.concurrent.ThreadContext;
@@ -20,11 +24,13 @@
2024
import org.elasticsearch.test.ESTestCase;
2125
import org.elasticsearch.threadpool.ThreadPool;
2226
import org.elasticsearch.transport.TransportService;
27+
import org.elasticsearch.xpack.core.watcher.WatcherMetaData;
2328
import org.elasticsearch.xpack.core.watcher.execution.WatchExecutionSnapshot;
2429
import org.elasticsearch.xpack.core.watcher.transport.actions.ack.AckWatchRequest;
2530
import org.elasticsearch.xpack.core.watcher.transport.actions.ack.AckWatchResponse;
31+
import org.elasticsearch.xpack.core.watcher.transport.actions.stats.WatcherStatsAction;
32+
import org.elasticsearch.xpack.core.watcher.transport.actions.stats.WatcherStatsResponse;
2633
import org.elasticsearch.xpack.core.watcher.watch.Watch;
27-
import org.elasticsearch.xpack.watcher.execution.ExecutionService;
2834
import org.elasticsearch.xpack.watcher.watch.WatchParser;
2935
import org.junit.Before;
3036

@@ -34,14 +40,14 @@
3440

3541
import static org.hamcrest.Matchers.is;
3642
import static org.mockito.Matchers.anyObject;
43+
import static org.mockito.Matchers.eq;
3744
import static org.mockito.Mockito.doAnswer;
3845
import static org.mockito.Mockito.mock;
3946
import static org.mockito.Mockito.when;
4047

4148
public class TransportAckWatchActionTests extends ESTestCase {
4249

4350
private TransportAckWatchAction action;
44-
private ExecutionService executionService;
4551
private Client client;
4652

4753
@Before
@@ -51,11 +57,10 @@ public void setupAction() {
5157
ThreadContext threadContext = new ThreadContext(Settings.EMPTY);
5258
when(threadPool.getThreadContext()).thenReturn(threadContext);
5359
WatchParser watchParser = mock(WatchParser.class);
54-
executionService = mock(ExecutionService.class);
5560
client = mock(Client.class);
5661
when(client.threadPool()).thenReturn(threadPool);
5762
action = new TransportAckWatchAction(Settings.EMPTY, transportService, new ActionFilters(Collections.emptySet()),
58-
Clock.systemUTC(), new XPackLicenseState(Settings.EMPTY), watchParser, executionService, client);
63+
Clock.systemUTC(), new XPackLicenseState(Settings.EMPTY), watchParser, client);
5964
}
6065

6166
public void testWatchNotFound() {
@@ -67,6 +72,13 @@ public void testWatchNotFound() {
6772
return null;
6873
}).when(client).get(anyObject(), anyObject());
6974

75+
doAnswer(invocation -> {
76+
ContextPreservingActionListener listener = (ContextPreservingActionListener) invocation.getArguments()[2];
77+
listener.onResponse(new WatcherStatsResponse(new ClusterName("clusterName"), new WatcherMetaData(false),
78+
Collections.emptyList(), Collections.emptyList()));
79+
return null;
80+
}).when(client).execute(eq(WatcherStatsAction.INSTANCE), anyObject(), anyObject());
81+
7082
AckWatchRequest ackWatchRequest = new AckWatchRequest(watchId);
7183
PlainActionFuture<AckWatchResponse> listener = PlainActionFuture.newFuture();
7284
action.doExecute(ackWatchRequest, listener);
@@ -78,9 +90,18 @@ public void testWatchNotFound() {
7890

7991
public void testThatWatchCannotBeAckedWhileRunning() {
8092
String watchId = "my_watch_id";
81-
WatchExecutionSnapshot snapshot = mock(WatchExecutionSnapshot.class);
82-
when(snapshot.watchId()).thenReturn(watchId);
83-
when(executionService.currentExecutions()).thenReturn(Collections.singletonList(snapshot));
93+
94+
doAnswer(invocation -> {
95+
ContextPreservingActionListener listener = (ContextPreservingActionListener) invocation.getArguments()[2];
96+
DiscoveryNode discoveryNode = new DiscoveryNode("node_2", buildNewFakeTransportAddress(), Version.CURRENT);
97+
WatcherStatsResponse.Node node = new WatcherStatsResponse.Node(discoveryNode);
98+
WatchExecutionSnapshot snapshot = mock(WatchExecutionSnapshot.class);
99+
when(snapshot.watchId()).thenReturn(watchId);
100+
node.setSnapshots(Collections.singletonList(snapshot));
101+
listener.onResponse(new WatcherStatsResponse(new ClusterName("clusterName"),
102+
new WatcherMetaData(false), Collections.singletonList(node), Collections.emptyList()));
103+
return null;
104+
}).when(client).execute(eq(WatcherStatsAction.INSTANCE), anyObject(), anyObject());
84105

85106
AckWatchRequest ackWatchRequest = new AckWatchRequest(watchId);
86107
PlainActionFuture<AckWatchResponse> listener = PlainActionFuture.newFuture();
@@ -91,4 +112,4 @@ public void testThatWatchCannotBeAckedWhileRunning() {
91112
assertThat(e.getMessage(), is("watch[my_watch_id] is running currently, cannot ack until finished"));
92113
assertThat(e.status(), is(RestStatus.CONFLICT));
93114
}
94-
}
115+
}

0 commit comments

Comments
 (0)