Skip to content

Commit 73a672b

Browse files
committed
Fix Watcher stats class cast exception (#39821)
The watcher stats implementation tries to look at all queued watches before preparing the result. We want to cast these to a WatchExecutionTask to extract the context to prepare the stats for queued watches. The problem is that not all tasks on the watcher queue were WatchExecutionTask. This is because a manually executed watch was not even at all wrapped in a WatchExecutionTask. Moreover, we were using ExecutorService#submit(Runnable) which would wrap the Runnable in a FutureTask<?>. This commit addresses this by using a WatchExecutionTask, and also using ExecutorService#execute(Runnable) so that no wrapping occurs. This will let us continue with the assumption that all queued tasks are WatchExecutionTasks.
1 parent 3246d3e commit 73a672b

File tree

3 files changed

+177
-37
lines changed

3 files changed

+177
-37
lines changed

x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/execution/ExecutionService.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -534,12 +534,12 @@ private void clearExecutions() {
534534
// the watch execution task takes another runnable as parameter
535535
// the best solution would be to move the whole execute() method, which is handed over as ctor parameter
536536
// over into this class, this is the quicker way though
537-
static final class WatchExecutionTask implements Runnable {
537+
public static final class WatchExecutionTask implements Runnable {
538538

539539
private final WatchExecutionContext ctx;
540540
private final Runnable runnable;
541541

542-
WatchExecutionTask(WatchExecutionContext ctx, Runnable runnable) {
542+
public WatchExecutionTask(WatchExecutionContext ctx, Runnable runnable) {
543543
this.ctx = ctx;
544544
this.runnable = runnable;
545545
}

x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/transport/actions/execute/TransportExecuteWatchAction.java

Lines changed: 51 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import org.elasticsearch.transport.TransportService;
2626
import org.elasticsearch.xpack.core.XPackField;
2727
import org.elasticsearch.xpack.core.watcher.execution.ActionExecutionMode;
28+
import org.elasticsearch.xpack.core.watcher.execution.WatchExecutionContext;
2829
import org.elasticsearch.xpack.core.watcher.history.WatchRecord;
2930
import org.elasticsearch.xpack.core.watcher.support.xcontent.WatcherParams;
3031
import org.elasticsearch.xpack.core.watcher.transport.actions.execute.ExecuteWatchAction;
@@ -110,48 +111,63 @@ protected void doExecute(ExecuteWatchRequest request, ActionListener<ExecuteWatc
110111
}
111112
}
112113

113-
private void executeWatch(ExecuteWatchRequest request, ActionListener<ExecuteWatchResponse> listener,
114-
Watch watch, boolean knownWatch) {
115-
116-
threadPool.executor(XPackField.WATCHER).submit(new AbstractRunnable() {
117-
@Override
118-
public void onFailure(Exception e) {
119-
listener.onFailure(e);
114+
private void executeWatch(
115+
final ExecuteWatchRequest request,
116+
final ActionListener<ExecuteWatchResponse> listener,
117+
final Watch watch,
118+
final boolean knownWatch) {
119+
try {
120+
/*
121+
* Ensure that the headers from the incoming request are used instead those of the stored watch otherwise the watch would run
122+
* as the user who stored the watch, but it needs to run as the user who executes this request.
123+
*/
124+
final Map<String, String> headers = new HashMap<>(threadPool.getThreadContext().getHeaders());
125+
watch.status().setHeaders(headers);
126+
127+
final String triggerType = watch.trigger().type();
128+
final TriggerEvent triggerEvent = triggerService.simulateEvent(triggerType, watch.id(), request.getTriggerData());
129+
130+
final ManualExecutionContext.Builder ctxBuilder = ManualExecutionContext.builder(
131+
watch,
132+
knownWatch,
133+
new ManualTriggerEvent(triggerEvent.jobName(), triggerEvent), executionService.defaultThrottlePeriod());
134+
135+
final ZonedDateTime executionTime = clock.instant().atZone(ZoneOffset.UTC);
136+
ctxBuilder.executionTime(executionTime);
137+
for (final Map.Entry<String, ActionExecutionMode> entry : request.getActionModes().entrySet()) {
138+
ctxBuilder.actionMode(entry.getKey(), entry.getValue());
139+
}
140+
if (request.getAlternativeInput() != null) {
141+
ctxBuilder.withInput(new SimpleInput.Result(new Payload.Simple(request.getAlternativeInput())));
120142
}
143+
if (request.isIgnoreCondition()) {
144+
ctxBuilder.withCondition(InternalAlwaysCondition.RESULT_INSTANCE);
145+
}
146+
ctxBuilder.recordExecution(request.isRecordExecution());
147+
final WatchExecutionContext ctx = ctxBuilder.build();
121148

122-
@Override
123-
protected void doRun() throws Exception {
124-
// ensure that the headers from the incoming request are used instead those of the stored watch
125-
// otherwise the watch would run as the user who stored the watch, but it needs to be run as the user who
126-
// executes this request
127-
Map<String, String> headers = new HashMap<>(threadPool.getThreadContext().getHeaders());
128-
watch.status().setHeaders(headers);
149+
// use execute so that the runnable is not wrapped in a RunnableFuture<?>
150+
threadPool.executor(XPackField.WATCHER).execute(new ExecutionService.WatchExecutionTask(ctx, new AbstractRunnable() {
129151

130-
String triggerType = watch.trigger().type();
131-
TriggerEvent triggerEvent = triggerService.simulateEvent(triggerType, watch.id(), request.getTriggerData());
152+
@Override
153+
public void onFailure(final Exception e) {
154+
listener.onFailure(e);
155+
}
132156

133-
ManualExecutionContext.Builder ctxBuilder = ManualExecutionContext.builder(watch, knownWatch,
134-
new ManualTriggerEvent(triggerEvent.jobName(), triggerEvent), executionService.defaultThrottlePeriod());
157+
@Override
158+
protected void doRun() throws Exception {
159+
final WatchRecord record = executionService.execute(ctx);
160+
final XContentBuilder builder = XContentFactory.jsonBuilder();
135161

136-
ZonedDateTime executionTime = clock.instant().atZone(ZoneOffset.UTC);
137-
ctxBuilder.executionTime(executionTime);
138-
for (Map.Entry<String, ActionExecutionMode> entry : request.getActionModes().entrySet()) {
139-
ctxBuilder.actionMode(entry.getKey(), entry.getValue());
162+
record.toXContent(builder, WatcherParams.builder().hideSecrets(true).debug(request.isDebug()).build());
163+
listener.onResponse(new ExecuteWatchResponse(record.id().value(), BytesReference.bytes(builder), XContentType.JSON));
140164
}
141-
if (request.getAlternativeInput() != null) {
142-
ctxBuilder.withInput(new SimpleInput.Result(new Payload.Simple(request.getAlternativeInput())));
143-
}
144-
if (request.isIgnoreCondition()) {
145-
ctxBuilder.withCondition(InternalAlwaysCondition.RESULT_INSTANCE);
146-
}
147-
ctxBuilder.recordExecution(request.isRecordExecution());
148165

149-
WatchRecord record = executionService.execute(ctxBuilder.build());
150-
XContentBuilder builder = XContentFactory.jsonBuilder();
166+
}));
167+
} catch (final Exception e) {
168+
listener.onFailure(e);
169+
}
170+
151171

152-
record.toXContent(builder, WatcherParams.builder().hideSecrets(true).debug(request.isDebug()).build());
153-
listener.onResponse(new ExecuteWatchResponse(record.id().value(), BytesReference.bytes(builder), XContentType.JSON));
154-
}
155-
});
156172
}
157173
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,124 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License;
4+
* you may not use this file except in compliance with the Elastic License.
5+
*/
6+
7+
package org.elasticsearch.xpack.watcher.execution;
8+
9+
import org.elasticsearch.action.ActionFuture;
10+
import org.elasticsearch.action.FailedNodeException;
11+
import org.elasticsearch.common.settings.Settings;
12+
import org.elasticsearch.common.unit.TimeValue;
13+
import org.elasticsearch.xpack.core.watcher.client.WatchSourceBuilder;
14+
import org.elasticsearch.xpack.core.watcher.client.WatcherClient;
15+
import org.elasticsearch.xpack.core.watcher.execution.ActionExecutionMode;
16+
import org.elasticsearch.xpack.core.watcher.transport.actions.execute.ExecuteWatchRequest;
17+
import org.elasticsearch.xpack.core.watcher.transport.actions.execute.ExecuteWatchResponse;
18+
import org.elasticsearch.xpack.core.watcher.transport.actions.stats.WatcherStatsResponse;
19+
import org.elasticsearch.xpack.watcher.actions.index.IndexAction;
20+
import org.elasticsearch.xpack.watcher.test.AbstractWatcherIntegrationTestCase;
21+
import org.elasticsearch.xpack.watcher.trigger.manual.ManualTriggerEvent;
22+
import org.elasticsearch.xpack.watcher.trigger.schedule.ScheduleTriggerEvent;
23+
24+
import java.io.IOException;
25+
import java.time.ZoneOffset;
26+
import java.time.ZonedDateTime;
27+
import java.util.ArrayList;
28+
import java.util.List;
29+
import java.util.concurrent.BrokenBarrierException;
30+
import java.util.concurrent.CyclicBarrier;
31+
import java.util.concurrent.ExecutionException;
32+
33+
import static org.elasticsearch.xpack.watcher.input.InputBuilders.simpleInput;
34+
import static org.elasticsearch.xpack.watcher.trigger.TriggerBuilders.schedule;
35+
import static org.elasticsearch.xpack.watcher.trigger.schedule.Schedules.interval;
36+
import static org.hamcrest.Matchers.empty;
37+
38+
public class ExecuteWatchQueuedStatsTests extends AbstractWatcherIntegrationTestCase {
39+
40+
@Override
41+
protected Settings nodeSettings(int nodeOrdinal) {
42+
// we use a small thread pool to force executions to be queued
43+
return Settings.builder().put(super.nodeSettings(nodeOrdinal)).put("xpack.watcher.thread_pool.size", 1).build();
44+
}
45+
46+
@Override
47+
protected boolean timeWarped() {
48+
return false;
49+
}
50+
51+
/*
52+
* This test is effectively forcing a manually executed watch to end up queued while we simultaneously try to get stats, including
53+
* queued watches. The reason that we do this is because previously a manually executed watch would be queued as a FutureTask<?> while
54+
* we try to cast queued watches to WatchExecutionTask. This would previously result in a ClassCastException. This test fails when that
55+
* happens yet succeeds with the production code change that accompanies this test.
56+
*/
57+
public void testQueuedStats() throws ExecutionException, InterruptedException {
58+
final WatcherClient client = new WatcherClient(client());
59+
client.preparePutWatch("id")
60+
.setActive(true)
61+
.setSource(
62+
new WatchSourceBuilder()
63+
.input(simpleInput("payload", "yes"))
64+
.trigger(schedule(interval("1s")))
65+
.addAction(
66+
"action",
67+
TimeValue.timeValueSeconds(1),
68+
IndexAction.builder("test_index", "acknowledgement").setDocId("id")))
69+
.get();
70+
71+
final int numberOfIterations = 128 - scaledRandomIntBetween(0, 128);
72+
73+
final CyclicBarrier barrier = new CyclicBarrier(2);
74+
75+
final List<ActionFuture<ExecuteWatchResponse>> futures = new ArrayList<>(numberOfIterations);
76+
final Thread executeWatchThread = new Thread(() -> {
77+
try {
78+
barrier.await();
79+
} catch (final BrokenBarrierException | InterruptedException e) {
80+
fail(e.toString());
81+
}
82+
for (int i = 0; i < numberOfIterations; i++) {
83+
final ExecuteWatchRequest request = new ExecuteWatchRequest("id");
84+
try {
85+
request.setTriggerEvent(new ManualTriggerEvent(
86+
"id-" + i,
87+
new ScheduleTriggerEvent(ZonedDateTime.now(ZoneOffset.UTC), ZonedDateTime.now(ZoneOffset.UTC))));
88+
} catch (final IOException e) {
89+
fail(e.toString());
90+
}
91+
request.setActionMode("_all", ActionExecutionMode.EXECUTE);
92+
request.setRecordExecution(true);
93+
futures.add(client.executeWatch(request));
94+
}
95+
});
96+
executeWatchThread.start();
97+
98+
final List<FailedNodeException> failures = new ArrayList<>();
99+
final Thread watcherStatsThread = new Thread(() -> {
100+
try {
101+
barrier.await();
102+
} catch (final BrokenBarrierException | InterruptedException e) {
103+
fail(e.toString());
104+
}
105+
for (int i = 0; i < numberOfIterations; i++) {
106+
final WatcherStatsResponse response = client.prepareWatcherStats().setIncludeQueuedWatches(true).get();
107+
failures.addAll(response.failures());
108+
}
109+
});
110+
watcherStatsThread.start();
111+
112+
executeWatchThread.join();
113+
watcherStatsThread.join();
114+
115+
for (final ActionFuture<ExecuteWatchResponse> future : futures) {
116+
future.get();
117+
}
118+
119+
assertThat(failures, empty());
120+
121+
client.prepareDeleteWatch("id").get();
122+
}
123+
124+
}

0 commit comments

Comments
 (0)