Skip to content

Commit 9077c44

Browse files
committed
Watcher: Allow to execute actions for each element in array (#41997)
This adds the ability to execute an action for each element that occurs in an array, for example you could sent a dedicated slack action for each search hit returned from a search. There is also a limit for the number of actions executed, which is hardcoded to 100 right now, to prevent having watches run forever. The watch history logs each action result and the total number of actions the were executed. Relates #34546
1 parent 2a8f30e commit 9077c44

File tree

12 files changed

+448
-31
lines changed

12 files changed

+448
-31
lines changed

x-pack/docs/en/watcher/actions.asciidoc

+43
Original file line numberDiff line numberDiff line change
@@ -192,6 +192,49 @@ of a watch during its execution:
192192
image::images/action-throttling.jpg[align="center"]
193193

194194

195+
[[action-foreach]]
196+
=== Running an action for each element in an array
197+
198+
You can use the `foreach` field in an action to trigger the configured action
199+
for every element within that array.
200+
201+
In order to protect from long running watches, after one hundred runs with an
202+
foreach loop the execution is gracefully stopped.
203+
204+
[source,js]
205+
--------------------------------------------------
206+
PUT _watcher/watch/log_event_watch
207+
{
208+
"trigger" : {
209+
"schedule" : { "interval" : "5m" }
210+
},
211+
"input" : {
212+
"search" : {
213+
"request" : {
214+
"indices" : "log-events",
215+
"body" : {
216+
"query" : { "match" : { "status" : "error" } }
217+
}
218+
}
219+
}
220+
},
221+
"condition" : {
222+
"compare" : { "ctx.payload.hits.total" : { "gt" : 0 } }
223+
},
224+
"actions" : {
225+
"log_hits" : {
226+
"foreach" : "ctx.payload.hits.hits", <1>
227+
"logging" : {
228+
"text" : "Found id {{ctx.payload._id}} with field {{ctx.payload._source.my_field}}"
229+
}
230+
}
231+
}
232+
}
233+
--------------------------------------------------
234+
// CONSOLE
235+
236+
<1> The logging statement will be executed for each of the returned search hits.
237+
195238
[[action-conditions]]
196239
=== Adding conditions to actions
197240

x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/watcher/actions/ActionWrapper.java

+106-8
Original file line numberDiff line numberDiff line change
@@ -7,13 +7,17 @@
77

88
import org.apache.logging.log4j.message.ParameterizedMessage;
99
import org.apache.logging.log4j.util.Supplier;
10+
import org.elasticsearch.ElasticsearchException;
1011
import org.elasticsearch.ElasticsearchParseException;
1112
import org.elasticsearch.common.Nullable;
13+
import org.elasticsearch.common.Strings;
1214
import org.elasticsearch.common.unit.TimeValue;
15+
import org.elasticsearch.common.xcontent.ObjectPath;
1316
import org.elasticsearch.common.xcontent.ToXContentObject;
1417
import org.elasticsearch.common.xcontent.XContentBuilder;
1518
import org.elasticsearch.common.xcontent.XContentParser;
1619
import org.elasticsearch.license.XPackLicenseState;
20+
import org.elasticsearch.script.JodaCompatibleZonedDateTime;
1721
import org.elasticsearch.xpack.core.watcher.actions.throttler.ActionThrottler;
1822
import org.elasticsearch.xpack.core.watcher.actions.throttler.Throttler;
1923
import org.elasticsearch.xpack.core.watcher.actions.throttler.ThrottlerField;
@@ -30,29 +34,43 @@
3034
import java.time.Clock;
3135
import java.time.ZoneOffset;
3236
import java.time.ZonedDateTime;
37+
import java.util.ArrayList;
38+
import java.util.Collection;
39+
import java.util.Collections;
40+
import java.util.HashMap;
41+
import java.util.List;
42+
import java.util.Map;
3343
import java.util.Objects;
44+
import java.util.Set;
45+
import java.util.stream.Collectors;
3446

3547
import static org.elasticsearch.common.unit.TimeValue.timeValueMillis;
3648

3749
public class ActionWrapper implements ToXContentObject {
3850

51+
private final int MAXIMUM_FOREACH_RUNS = 100;
52+
3953
private String id;
4054
@Nullable
4155
private final ExecutableCondition condition;
4256
@Nullable
4357
private final ExecutableTransform<Transform, Transform.Result> transform;
4458
private final ActionThrottler throttler;
4559
private final ExecutableAction<? extends Action> action;
60+
@Nullable
61+
private String path;
4662

4763
public ActionWrapper(String id, ActionThrottler throttler,
4864
@Nullable ExecutableCondition condition,
4965
@Nullable ExecutableTransform<Transform, Transform.Result> transform,
50-
ExecutableAction<? extends Action> action) {
66+
ExecutableAction<? extends Action> action,
67+
@Nullable String path) {
5168
this.id = id;
5269
this.condition = condition;
5370
this.throttler = throttler;
5471
this.transform = transform;
5572
this.action = action;
73+
this.path = path;
5674
}
5775

5876
public String id() {
@@ -140,16 +158,90 @@ public ActionWrapperResult execute(WatchExecutionContext ctx) {
140158
return new ActionWrapperResult(id, conditionResult, null, new Action.Result.FailureWithException(action.type(), e));
141159
}
142160
}
143-
try {
144-
Action.Result actionResult = action.execute(id, ctx, payload);
145-
return new ActionWrapperResult(id, conditionResult, transformResult, actionResult);
146-
} catch (Exception e) {
147-
action.logger().error(
161+
if (Strings.isEmpty(path)) {
162+
try {
163+
Action.Result actionResult = action.execute(id, ctx, payload);
164+
return new ActionWrapperResult(id, conditionResult, transformResult, actionResult);
165+
} catch (Exception e) {
166+
action.logger().error(
167+
(Supplier<?>) () -> new ParameterizedMessage("failed to execute action [{}/{}]", ctx.watch().id(), id), e);
168+
return new ActionWrapperResult(id, new Action.Result.FailureWithException(action.type(), e));
169+
}
170+
} else {
171+
try {
172+
List<Action.Result> results = new ArrayList<>();
173+
Object object = ObjectPath.eval(path, toMap(ctx));
174+
int runs = 0;
175+
if (object instanceof Collection) {
176+
Collection collection = Collection.class.cast(object);
177+
if (collection.isEmpty()) {
178+
throw new ElasticsearchException("foreach object [{}] was an empty list, could not run any action", path);
179+
} else {
180+
for (Object o : collection) {
181+
if (runs >= MAXIMUM_FOREACH_RUNS) {
182+
break;
183+
}
184+
if (o instanceof Map) {
185+
results.add(action.execute(id, ctx, new Payload.Simple((Map<String, Object>) o)));
186+
} else {
187+
results.add(action.execute(id, ctx, new Payload.Simple("_value", o)));
188+
}
189+
runs++;
190+
}
191+
}
192+
} else if (object == null) {
193+
throw new ElasticsearchException("specified foreach object was null: [{}]", path);
194+
} else {
195+
throw new ElasticsearchException("specified foreach object was not a an array/collection: [{}]", path);
196+
}
197+
198+
// check if we have mixed results, then set to partial failure
199+
final Set<Action.Result.Status> statuses = results.stream().map(Action.Result::status).collect(Collectors.toSet());
200+
Action.Result.Status status;
201+
if (statuses.size() == 1) {
202+
status = statuses.iterator().next();
203+
} else {
204+
status = Action.Result.Status.PARTIAL_FAILURE;
205+
}
206+
207+
final int numberOfActionsExecuted = runs;
208+
return new ActionWrapperResult(id, conditionResult, transformResult,
209+
new Action.Result(action.type(), status) {
210+
@Override
211+
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
212+
builder.field("number_of_actions_executed", numberOfActionsExecuted);
213+
builder.startArray(WatchField.FOREACH.getPreferredName());
214+
for (Action.Result result : results) {
215+
builder.startObject();
216+
result.toXContent(builder, params);
217+
builder.endObject();
218+
}
219+
builder.endArray();
220+
return builder;
221+
}
222+
});
223+
} catch (Exception e) {
224+
action.logger().error(
148225
(Supplier<?>) () -> new ParameterizedMessage("failed to execute action [{}/{}]", ctx.watch().id(), id), e);
149-
return new ActionWrapperResult(id, new Action.Result.FailureWithException(action.type(), e));
226+
return new ActionWrapperResult(id, new Action.Result.FailureWithException(action.type(), e));
227+
}
150228
}
151229
}
152230

231+
private Map<String, Object> toMap(WatchExecutionContext ctx) {
232+
Map<String, Object> model = new HashMap<>();
233+
model.put("id", ctx.id().value());
234+
model.put("watch_id", ctx.id().watchId());
235+
model.put("execution_time", new JodaCompatibleZonedDateTime(ctx.executionTime().toInstant(), ZoneOffset.UTC));
236+
model.put("trigger", ctx.triggerEvent().data());
237+
model.put("metadata", ctx.watch().metadata());
238+
model.put("vars", ctx.vars());
239+
if (ctx.payload().data() != null) {
240+
model.put("payload", ctx.payload().data());
241+
}
242+
return Collections.singletonMap("ctx", model);
243+
}
244+
153245
@Override
154246
public boolean equals(Object o) {
155247
if (this == o) return true;
@@ -186,6 +278,9 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
186278
.field(transform.type(), transform, params)
187279
.endObject();
188280
}
281+
if (Strings.isEmpty(path) == false) {
282+
builder.field(WatchField.FOREACH.getPreferredName(), path);
283+
}
189284
builder.field(action.type(), action, params);
190285
return builder.endObject();
191286
}
@@ -198,6 +293,7 @@ static ActionWrapper parse(String watchId, String actionId, XContentParser parse
198293
ExecutableCondition condition = null;
199294
ExecutableTransform<Transform, Transform.Result> transform = null;
200295
TimeValue throttlePeriod = null;
296+
String path = null;
201297
ExecutableAction<? extends Action> action = null;
202298

203299
String currentFieldName = null;
@@ -208,6 +304,8 @@ static ActionWrapper parse(String watchId, String actionId, XContentParser parse
208304
} else {
209305
if (WatchField.CONDITION.match(currentFieldName, parser.getDeprecationHandler())) {
210306
condition = actionRegistry.getConditionRegistry().parseExecutable(watchId, parser);
307+
} else if (WatchField.FOREACH.match(currentFieldName, parser.getDeprecationHandler())) {
308+
path = parser.text();
211309
} else if (Transform.TRANSFORM.match(currentFieldName, parser.getDeprecationHandler())) {
212310
transform = actionRegistry.getTransformRegistry().parse(watchId, parser);
213311
} else if (ThrottlerField.THROTTLE_PERIOD.match(currentFieldName, parser.getDeprecationHandler())) {
@@ -235,7 +333,7 @@ static ActionWrapper parse(String watchId, String actionId, XContentParser parse
235333
}
236334

237335
ActionThrottler throttler = new ActionThrottler(clock, throttlePeriod, licenseState);
238-
return new ActionWrapper(actionId, throttler, condition, transform, action);
336+
return new ActionWrapper(actionId, throttler, condition, transform, action, path);
239337
}
240338

241339
}

x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/watcher/client/WatchSourceBuilder.java

+14-3
Original file line numberDiff line numberDiff line change
@@ -101,7 +101,7 @@ public WatchSourceBuilder addAction(String id, TimeValue throttlePeriod, Transfo
101101
}
102102

103103
public WatchSourceBuilder addAction(String id, TimeValue throttlePeriod, Transform transform, Action action) {
104-
actions.put(id, new TransformedAction(id, action, throttlePeriod, null, transform));
104+
actions.put(id, new TransformedAction(id, action, throttlePeriod, null, transform, null));
105105
return this;
106106
}
107107

@@ -111,7 +111,13 @@ public WatchSourceBuilder addAction(String id, TimeValue throttlePeriod, Conditi
111111
}
112112

113113
public WatchSourceBuilder addAction(String id, TimeValue throttlePeriod, Condition condition, Transform transform, Action action) {
114-
actions.put(id, new TransformedAction(id, action, throttlePeriod, condition, transform));
114+
actions.put(id, new TransformedAction(id, action, throttlePeriod, condition, transform, null));
115+
return this;
116+
}
117+
118+
public WatchSourceBuilder addAction(String id, TimeValue throttlePeriod, Condition condition, Transform transform, String path,
119+
Action action) {
120+
actions.put(id, new TransformedAction(id, action, throttlePeriod, condition, transform, path));
115121
return this;
116122
}
117123

@@ -186,16 +192,18 @@ public final BytesReference buildAsBytes(XContentType contentType) {
186192
static class TransformedAction implements ToXContentObject {
187193

188194
private final Action action;
195+
@Nullable private String path;
189196
@Nullable private final TimeValue throttlePeriod;
190197
@Nullable private final Condition condition;
191198
@Nullable private final Transform transform;
192199

193200
TransformedAction(String id, Action action, @Nullable TimeValue throttlePeriod,
194-
@Nullable Condition condition, @Nullable Transform transform) {
201+
@Nullable Condition condition, @Nullable Transform transform, @Nullable String path) {
195202
this.throttlePeriod = throttlePeriod;
196203
this.condition = condition;
197204
this.transform = transform;
198205
this.action = action;
206+
this.path = path;
199207
}
200208

201209
@Override
@@ -215,6 +223,9 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
215223
.field(transform.type(), transform, params)
216224
.endObject();
217225
}
226+
if (path != null) {
227+
builder.field("foreach", path);
228+
}
218229
builder.field(action.type(), action, params);
219230
return builder.endObject();
220231
}

x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/watcher/support/WatcherIndexTemplateRegistryField.java

+2-1
Original file line numberDiff line numberDiff line change
@@ -14,8 +14,9 @@ public final class WatcherIndexTemplateRegistryField {
1414
// version 7: add full exception stack traces for better debugging
1515
// version 8: fix slack attachment property not to be dynamic, causing field type issues
1616
// version 9: add a user field defining which user executed the watch
17+
// version 10: add support for foreach path in actions
1718
// Note: if you change this, also inform the kibana team around the watcher-ui
18-
public static final String INDEX_TEMPLATE_VERSION = "9";
19+
public static final String INDEX_TEMPLATE_VERSION = "10";
1920
public static final String HISTORY_TEMPLATE_NAME = ".watch-history-" + INDEX_TEMPLATE_VERSION;
2021
public static final String HISTORY_TEMPLATE_NAME_NO_ILM = ".watch-history-no-ilm-" + INDEX_TEMPLATE_VERSION;
2122
public static final String TRIGGERED_TEMPLATE_NAME = ".triggered_watches";

x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/watcher/watch/WatchField.java

+1
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ public final class WatchField {
1313
public static final ParseField CONDITION = new ParseField("condition");
1414
public static final ParseField ACTIONS = new ParseField("actions");
1515
public static final ParseField TRANSFORM = new ParseField("transform");
16+
public static final ParseField FOREACH = new ParseField("foreach");
1617
public static final ParseField THROTTLE_PERIOD = new ParseField("throttle_period_in_millis");
1718
public static final ParseField THROTTLE_PERIOD_HUMAN = new ParseField("throttle_period");
1819
public static final ParseField METADATA = new ParseField("metadata");

x-pack/plugin/core/src/main/resources/watch-history.json

+7
Original file line numberDiff line numberDiff line change
@@ -264,6 +264,13 @@
264264
"reason" : {
265265
"type" : "keyword"
266266
},
267+
"number_of_actions_executed": {
268+
"type": "integer"
269+
},
270+
"foreach" : {
271+
"type": "object",
272+
"enabled" : false
273+
},
267274
"email": {
268275
"type": "object",
269276
"dynamic": true,

0 commit comments

Comments
 (0)