Skip to content

Commit 54efef5

Browse files
committed
Watcher: Allow to execute actions for each element in array
This adds the ability to execute an action for each element that occurs in an array, for example you could sent a dedicated slack action for each search hit returned from a search. Relates elastic#34546
1 parent f2a558d commit 54efef5

File tree

11 files changed

+336
-30
lines changed

11 files changed

+336
-30
lines changed

x-pack/docs/en/watcher/actions.asciidoc

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -192,6 +192,48 @@ of a watch during its execution:
192192
image::images/action-throttling.jpg[align="center"]
193193

194194

195+
[[action-foreach]]
196+
=== Running an action for each element in an array
197+
198+
You can use the `foreach` field in an action to trigger the configured action
199+
for every element within that array. Each element within that array has to be a
200+
map.
201+
202+
[source,js]
203+
--------------------------------------------------
204+
PUT _watcher/watch/log_event_watch
205+
{
206+
"trigger" : {
207+
"schedule" : { "interval" : "5m" }
208+
},
209+
"input" : {
210+
"search" : {
211+
"request" : {
212+
"indices" : "log-events",
213+
"body" : {
214+
"size" : 0,
215+
"query" : { "match" : { "status" : "error" } }
216+
}
217+
}
218+
}
219+
},
220+
"condition" : {
221+
"compare" : { "ctx.payload.hits.total.value" : { "gt" : 0 } }
222+
},
223+
"actions" : {
224+
"log_hits" : {
225+
"foreach" : "ctx.payload.hits.hits", <1>
226+
"logging" : {
227+
"text" : "Found id {{_id}} with field {{_source.my_field}}"
228+
}
229+
}
230+
}
231+
}
232+
--------------------------------------------------
233+
// CONSOLE
234+
235+
<1> The logging statement will be executed for each of the returned search hits.
236+
195237
[[action-conditions]]
196238
=== Adding conditions to actions
197239

x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/watcher/actions/ActionWrapper.java

Lines changed: 78 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -7,9 +7,12 @@
77

88
import org.apache.logging.log4j.message.ParameterizedMessage;
99
import org.apache.logging.log4j.util.Supplier;
10+
import org.elasticsearch.ElasticsearchException;
1011
import org.elasticsearch.ElasticsearchParseException;
1112
import org.elasticsearch.common.Nullable;
13+
import org.elasticsearch.common.Strings;
1214
import org.elasticsearch.common.unit.TimeValue;
15+
import org.elasticsearch.common.xcontent.ObjectPath;
1316
import org.elasticsearch.common.xcontent.ToXContentObject;
1417
import org.elasticsearch.common.xcontent.XContentBuilder;
1518
import org.elasticsearch.common.xcontent.XContentParser;
@@ -30,7 +33,13 @@
3033
import java.time.Clock;
3134
import java.time.ZoneOffset;
3235
import java.time.ZonedDateTime;
36+
import java.util.ArrayList;
37+
import java.util.Collection;
38+
import java.util.List;
39+
import java.util.Map;
3340
import java.util.Objects;
41+
import java.util.Set;
42+
import java.util.stream.Collectors;
3443

3544
import static org.elasticsearch.common.unit.TimeValue.timeValueMillis;
3645

@@ -43,16 +52,20 @@ public class ActionWrapper implements ToXContentObject {
4352
private final ExecutableTransform<Transform, Transform.Result> transform;
4453
private final ActionThrottler throttler;
4554
private final ExecutableAction<? extends Action> action;
55+
@Nullable
56+
private String path;
4657

4758
public ActionWrapper(String id, ActionThrottler throttler,
4859
@Nullable ExecutableCondition condition,
4960
@Nullable ExecutableTransform<Transform, Transform.Result> transform,
50-
ExecutableAction<? extends Action> action) {
61+
ExecutableAction<? extends Action> action,
62+
@Nullable String path) {
5163
this.id = id;
5264
this.condition = condition;
5365
this.throttler = throttler;
5466
this.transform = transform;
5567
this.action = action;
68+
this.path = path;
5669
}
5770

5871
public String id() {
@@ -140,13 +153,64 @@ public ActionWrapperResult execute(WatchExecutionContext ctx) {
140153
return new ActionWrapperResult(id, conditionResult, null, new Action.Result.FailureWithException(action.type(), e));
141154
}
142155
}
143-
try {
144-
Action.Result actionResult = action.execute(id, ctx, payload);
145-
return new ActionWrapperResult(id, conditionResult, transformResult, actionResult);
146-
} catch (Exception e) {
147-
action.logger().error(
156+
if (Strings.isEmpty(path)) {
157+
try {
158+
Action.Result actionResult = action.execute(id, ctx, payload);
159+
return new ActionWrapperResult(id, conditionResult, transformResult, actionResult);
160+
} catch (Exception e) {
161+
action.logger().error(
162+
(Supplier<?>) () -> new ParameterizedMessage("failed to execute action [{}/{}]", ctx.watch().id(), id), e);
163+
return new ActionWrapperResult(id, new Action.Result.FailureWithException(action.type(), e));
164+
}
165+
} else {
166+
try {
167+
List<Action.Result> results = new ArrayList<>();
168+
Object object = ObjectPath.eval(path, ctx.payload().data());
169+
if (object instanceof Collection) {
170+
Collection collection = Collection.class.cast(object);
171+
if (collection.isEmpty()) {
172+
throw new ElasticsearchException("foreach object [{}] was an empty list, could not run any action", path);
173+
} else {
174+
for (Object o : collection) {
175+
if (o instanceof Map) {
176+
results.add(action.execute(id, ctx, new Payload.Simple((Map<String, Object>) o)));
177+
} else {
178+
throw new ElasticsearchException("item in foreach [{}] object was not a map", path);
179+
}
180+
}
181+
}
182+
} else {
183+
throw new ElasticsearchException("specified foreach object was not a an array/collection: [{}]", path);
184+
}
185+
186+
// check if we have mixed results, then set to partial failure
187+
final Set<Action.Result.Status> statuses = results.stream().map(Action.Result::status).collect(Collectors.toSet());
188+
Action.Result.Status status;
189+
if (statuses.size() == 1) {
190+
status = statuses.iterator().next();
191+
} else {
192+
status = Action.Result.Status.PARTIAL_FAILURE;
193+
}
194+
195+
return new ActionWrapperResult(id, conditionResult, transformResult,
196+
new Action.Result(action.type(), status) {
197+
@Override
198+
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
199+
builder.startArray(WatchField.FOREACH.getPreferredName());
200+
for (Action.Result result : results) {
201+
builder.startObject();
202+
result.toXContent(builder, params);
203+
builder.endObject();
204+
}
205+
builder.endArray();
206+
return builder;
207+
}
208+
});
209+
} catch (Exception e) {
210+
action.logger().error(
148211
(Supplier<?>) () -> new ParameterizedMessage("failed to execute action [{}/{}]", ctx.watch().id(), id), e);
149-
return new ActionWrapperResult(id, new Action.Result.FailureWithException(action.type(), e));
212+
return new ActionWrapperResult(id, new Action.Result.FailureWithException(action.type(), e));
213+
}
150214
}
151215
}
152216

@@ -186,6 +250,9 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
186250
.field(transform.type(), transform, params)
187251
.endObject();
188252
}
253+
if (Strings.isEmpty(path) == false) {
254+
builder.field(WatchField.FOREACH.getPreferredName(), path);
255+
}
189256
builder.field(action.type(), action, params);
190257
return builder.endObject();
191258
}
@@ -198,6 +265,7 @@ static ActionWrapper parse(String watchId, String actionId, XContentParser parse
198265
ExecutableCondition condition = null;
199266
ExecutableTransform<Transform, Transform.Result> transform = null;
200267
TimeValue throttlePeriod = null;
268+
String path = null;
201269
ExecutableAction<? extends Action> action = null;
202270

203271
String currentFieldName = null;
@@ -208,6 +276,8 @@ static ActionWrapper parse(String watchId, String actionId, XContentParser parse
208276
} else {
209277
if (WatchField.CONDITION.match(currentFieldName, parser.getDeprecationHandler())) {
210278
condition = actionRegistry.getConditionRegistry().parseExecutable(watchId, parser);
279+
} else if (WatchField.FOREACH.match(currentFieldName, parser.getDeprecationHandler())) {
280+
path = parser.text();
211281
} else if (Transform.TRANSFORM.match(currentFieldName, parser.getDeprecationHandler())) {
212282
transform = actionRegistry.getTransformRegistry().parse(watchId, parser);
213283
} else if (ThrottlerField.THROTTLE_PERIOD.match(currentFieldName, parser.getDeprecationHandler())) {
@@ -235,7 +305,7 @@ static ActionWrapper parse(String watchId, String actionId, XContentParser parse
235305
}
236306

237307
ActionThrottler throttler = new ActionThrottler(clock, throttlePeriod, licenseState);
238-
return new ActionWrapper(actionId, throttler, condition, transform, action);
308+
return new ActionWrapper(actionId, throttler, condition, transform, action, path);
239309
}
240310

241311
}

x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/watcher/client/WatchSourceBuilder.java

Lines changed: 14 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -101,7 +101,7 @@ public WatchSourceBuilder addAction(String id, TimeValue throttlePeriod, Transfo
101101
}
102102

103103
public WatchSourceBuilder addAction(String id, TimeValue throttlePeriod, Transform transform, Action action) {
104-
actions.put(id, new TransformedAction(id, action, throttlePeriod, null, transform));
104+
actions.put(id, new TransformedAction(id, action, throttlePeriod, null, transform, null));
105105
return this;
106106
}
107107

@@ -111,7 +111,13 @@ public WatchSourceBuilder addAction(String id, TimeValue throttlePeriod, Conditi
111111
}
112112

113113
public WatchSourceBuilder addAction(String id, TimeValue throttlePeriod, Condition condition, Transform transform, Action action) {
114-
actions.put(id, new TransformedAction(id, action, throttlePeriod, condition, transform));
114+
actions.put(id, new TransformedAction(id, action, throttlePeriod, condition, transform, null));
115+
return this;
116+
}
117+
118+
public WatchSourceBuilder addAction(String id, TimeValue throttlePeriod, Condition condition, Transform transform, String path,
119+
Action action) {
120+
actions.put(id, new TransformedAction(id, action, throttlePeriod, condition, transform, path));
115121
return this;
116122
}
117123

@@ -186,16 +192,18 @@ public final BytesReference buildAsBytes(XContentType contentType) {
186192
static class TransformedAction implements ToXContentObject {
187193

188194
private final Action action;
195+
@Nullable private String path;
189196
@Nullable private final TimeValue throttlePeriod;
190197
@Nullable private final Condition condition;
191198
@Nullable private final Transform transform;
192199

193200
TransformedAction(String id, Action action, @Nullable TimeValue throttlePeriod,
194-
@Nullable Condition condition, @Nullable Transform transform) {
201+
@Nullable Condition condition, @Nullable Transform transform, @Nullable String path) {
195202
this.throttlePeriod = throttlePeriod;
196203
this.condition = condition;
197204
this.transform = transform;
198205
this.action = action;
206+
this.path = path;
199207
}
200208

201209
@Override
@@ -215,6 +223,9 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
215223
.field(transform.type(), transform, params)
216224
.endObject();
217225
}
226+
if (path != null) {
227+
builder.field("foreach", path);
228+
}
218229
builder.field(action.type(), action, params);
219230
return builder.endObject();
220231
}

x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/watcher/support/WatcherIndexTemplateRegistryField.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,8 +14,9 @@ public final class WatcherIndexTemplateRegistryField {
1414
// version 7: add full exception stack traces for better debugging
1515
// version 8: fix slack attachment property not to be dynamic, causing field type issues
1616
// version 9: add a user field defining which user executed the watch
17+
// version 10: add support for foreach path in actions
1718
// Note: if you change this, also inform the kibana team around the watcher-ui
18-
public static final String INDEX_TEMPLATE_VERSION = "9";
19+
public static final String INDEX_TEMPLATE_VERSION = "10";
1920
public static final String HISTORY_TEMPLATE_NAME = ".watch-history-" + INDEX_TEMPLATE_VERSION;
2021
public static final String HISTORY_TEMPLATE_NAME_NO_ILM = ".watch-history-no-ilm-" + INDEX_TEMPLATE_VERSION;
2122
public static final String TRIGGERED_TEMPLATE_NAME = ".triggered_watches";

x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/watcher/watch/WatchField.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ public final class WatchField {
1313
public static final ParseField CONDITION = new ParseField("condition");
1414
public static final ParseField ACTIONS = new ParseField("actions");
1515
public static final ParseField TRANSFORM = new ParseField("transform");
16+
public static final ParseField FOREACH = new ParseField("foreach");
1617
public static final ParseField THROTTLE_PERIOD = new ParseField("throttle_period_in_millis");
1718
public static final ParseField THROTTLE_PERIOD_HUMAN = new ParseField("throttle_period");
1819
public static final ParseField METADATA = new ParseField("metadata");

x-pack/plugin/core/src/main/resources/watch-history.json

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -264,6 +264,10 @@
264264
"reason" : {
265265
"type" : "keyword"
266266
},
267+
"foreach" : {
268+
"type": "object",
269+
"enabled" : false
270+
},
267271
"email": {
268272
"type": "object",
269273
"dynamic": true,
Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,55 @@
1+
---
2+
setup:
3+
- do:
4+
cluster.health:
5+
wait_for_status: yellow
6+
7+
---
8+
teardown:
9+
- do:
10+
watcher.delete_watch:
11+
id: "test_watch"
12+
ignore: 404
13+
14+
---
15+
"Test execute watch api with foreach action":
16+
- do:
17+
watcher.execute_watch:
18+
body: >
19+
{
20+
"watch" : {
21+
"trigger": {
22+
"schedule" : { "cron" : "0 0 0 1 * ? 2099" }
23+
},
24+
"input": {
25+
"simple": {
26+
"my_array" : [
27+
{ "_doc" : { "_index" : "my_test_index", "_id" : "first", "key" : "first" } },
28+
{ "_doc" : { "_index" : "my_test_index", "_id" : "second", "key" : "second" } }
29+
]
30+
}
31+
},
32+
"actions": {
33+
"indexme" : {
34+
"foreach" : "my_array",
35+
"index" : {}
36+
}
37+
}
38+
}
39+
}
40+
41+
- match: { watch_record.trigger_event.type: "manual" }
42+
- match: { watch_record.state: "executed" }
43+
- match: { watch_record.status.execution_state: "executed" }
44+
45+
- do:
46+
mget:
47+
body:
48+
docs:
49+
- { _index: my_test_index, _id: "first"}
50+
- { _index: my_test_index, _id: "second"}
51+
52+
- is_true: docs.0.found
53+
- match: { docs.0._source: { "key": "first" }}
54+
- is_true: docs.1.found
55+
- match: { docs.1._source: { "key": "second" }}

0 commit comments

Comments
 (0)