Skip to content

Commit 154d1dd

Browse files
authored
Watcher max_iterations with foreach action execution (#45715) (#46039)
Prior to this commit the foreach action execution had a hard coded limit to 100 iterations. This commit allows the max number of iterations to be a configuration ('max_iterations') on the foreach action. The default remains 100.
1 parent 956df7b commit 154d1dd

File tree

7 files changed

+82
-31
lines changed

7 files changed

+82
-31
lines changed

x-pack/docs/en/watcher/actions.asciidoc

+5-2
Original file line numberDiff line numberDiff line change
@@ -198,8 +198,10 @@ image::images/action-throttling.jpg[align="center"]
198198
You can use the `foreach` field in an action to trigger the configured action
199199
for every element within that array.
200200

201-
In order to protect from long running watches, after one hundred runs with an
202-
foreach loop the execution is gracefully stopped.
201+
In order to protect from long running watches, you can use the `max_iterations`
202+
field to limit the maximum amount of runs that each watch executes. If this limit
203+
is reached, the execution is gracefully stopped. If not set, this field defaults
204+
to one hundred.
203205

204206
[source,js]
205207
--------------------------------------------------
@@ -224,6 +226,7 @@ PUT _watcher/watch/log_event_watch
224226
"actions" : {
225227
"log_hits" : {
226228
"foreach" : "ctx.payload.hits.hits", <1>
229+
"max_iterations" : 500,
227230
"logging" : {
228231
"text" : "Found id {{ctx.payload._id}} with field {{ctx.payload._source.my_field}}"
229232
}

x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/watcher/actions/ActionWrapper.java

+12-5
Original file line numberDiff line numberDiff line change
@@ -48,8 +48,6 @@
4848

4949
public class ActionWrapper implements ToXContentObject {
5050

51-
private final int MAXIMUM_FOREACH_RUNS = 100;
52-
5351
private String id;
5452
@Nullable
5553
private final ExecutableCondition condition;
@@ -59,18 +57,21 @@ public class ActionWrapper implements ToXContentObject {
5957
private final ExecutableAction<? extends Action> action;
6058
@Nullable
6159
private String path;
60+
private final Integer maxIterations;
6261

6362
public ActionWrapper(String id, ActionThrottler throttler,
6463
@Nullable ExecutableCondition condition,
6564
@Nullable ExecutableTransform<Transform, Transform.Result> transform,
6665
ExecutableAction<? extends Action> action,
67-
@Nullable String path) {
66+
@Nullable String path,
67+
@Nullable Integer maxIterations) {
6868
this.id = id;
6969
this.condition = condition;
7070
this.throttler = throttler;
7171
this.transform = transform;
7272
this.action = action;
7373
this.path = path;
74+
this.maxIterations = (maxIterations != null) ? maxIterations : 100;
7475
}
7576

7677
public String id() {
@@ -178,7 +179,7 @@ public ActionWrapperResult execute(WatchExecutionContext ctx) {
178179
throw new ElasticsearchException("foreach object [{}] was an empty list, could not run any action", path);
179180
} else {
180181
for (Object o : collection) {
181-
if (runs >= MAXIMUM_FOREACH_RUNS) {
182+
if (runs >= maxIterations) {
182183
break;
183184
}
184185
if (o instanceof Map) {
@@ -217,6 +218,7 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
217218
builder.endObject();
218219
}
219220
builder.endArray();
221+
builder.field(WatchField.MAX_ITERATIONS.getPreferredName(), maxIterations);
220222
return builder;
221223
}
222224
});
@@ -280,7 +282,9 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
280282
}
281283
if (Strings.isEmpty(path) == false) {
282284
builder.field(WatchField.FOREACH.getPreferredName(), path);
285+
builder.field(WatchField.MAX_ITERATIONS.getPreferredName(), maxIterations);
283286
}
287+
284288
builder.field(action.type(), action, params);
285289
return builder.endObject();
286290
}
@@ -295,6 +299,7 @@ static ActionWrapper parse(String watchId, String actionId, XContentParser parse
295299
TimeValue throttlePeriod = null;
296300
String path = null;
297301
ExecutableAction<? extends Action> action = null;
302+
Integer maxIterations = null;
298303

299304
String currentFieldName = null;
300305
XContentParser.Token token;
@@ -317,6 +322,8 @@ static ActionWrapper parse(String watchId, String actionId, XContentParser parse
317322
throw new ElasticsearchParseException("could not parse action [{}/{}]. failed to parse field [{}] as time value",
318323
pe, watchId, actionId, currentFieldName);
319324
}
325+
} else if (WatchField.MAX_ITERATIONS.match(currentFieldName, parser.getDeprecationHandler())) {
326+
maxIterations = parser.intValue();
320327
} else {
321328
// it's the type of the action
322329
ActionFactory actionFactory = actionRegistry.factory(currentFieldName);
@@ -333,7 +340,7 @@ static ActionWrapper parse(String watchId, String actionId, XContentParser parse
333340
}
334341

335342
ActionThrottler throttler = new ActionThrottler(clock, throttlePeriod, licenseState);
336-
return new ActionWrapper(actionId, throttler, condition, transform, action, path);
343+
return new ActionWrapper(actionId, throttler, condition, transform, action, path, maxIterations);
337344
}
338345

339346
}

x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/watcher/watch/WatchField.java

+1
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ public final class WatchField {
1414
public static final ParseField ACTIONS = new ParseField("actions");
1515
public static final ParseField TRANSFORM = new ParseField("transform");
1616
public static final ParseField FOREACH = new ParseField("foreach");
17+
public static final ParseField MAX_ITERATIONS = new ParseField("max_iterations");
1718
public static final ParseField THROTTLE_PERIOD = new ParseField("throttle_period_in_millis");
1819
public static final ParseField THROTTLE_PERIOD_HUMAN = new ParseField("throttle_period");
1920
public static final ParseField METADATA = new ParseField("metadata");

x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/actions/ActionWrapperTests.java

+46-7
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
*/
66
package org.elasticsearch.xpack.watcher.actions;
77

8+
import com.google.common.collect.ImmutableMap;
89
import org.elasticsearch.common.Strings;
910
import org.elasticsearch.common.xcontent.ToXContent;
1011
import org.elasticsearch.common.xcontent.XContentBuilder;
@@ -54,7 +55,7 @@ public class ActionWrapperTests extends ESTestCase {
5455
private Watch watch = mock(Watch.class);
5556
@SuppressWarnings("unchecked")
5657
private ExecutableAction<Action> executableAction = mock(ExecutableAction.class);
57-
private ActionWrapper actionWrapper = new ActionWrapper("_action", null, NeverCondition.INSTANCE, null, executableAction, null);
58+
private ActionWrapper actionWrapper = new ActionWrapper("_action", null, NeverCondition.INSTANCE, null, executableAction, null, null);
5859

5960
public void testThatUnmetActionConditionResetsAckStatus() throws Exception {
6061
WatchStatus watchStatus = new WatchStatus(now, Collections.singletonMap("_action", createActionStatus(State.ACKED)));
@@ -84,7 +85,7 @@ public void testThatMultipleResultsCanBeReturned() throws Exception {
8485
final ExecutableAction<LoggingAction> executableAction =
8586
new ExecutableLoggingAction(loggingAction, logger, new MockTextTemplateEngine());
8687
ActionWrapper wrapper = new ActionWrapper("_action", null, InternalAlwaysCondition.INSTANCE, null, executableAction,
87-
"ctx.payload.my_path");
88+
"ctx.payload.my_path", null);
8889

8990
WatchExecutionContext ctx = mockExecutionContent(watch);
9091

@@ -111,7 +112,7 @@ public void testThatMultipleResultsCanBeReturned() throws Exception {
111112

112113
public void testThatSpecifiedPathIsNotCollection() {
113114
ActionWrapper wrapper = new ActionWrapper("_action", null, InternalAlwaysCondition.INSTANCE, null, executableAction,
114-
"ctx.payload.my_path");
115+
"ctx.payload.my_path", null);
115116
WatchExecutionContext ctx = mockExecutionContent(watch);
116117
Payload.Simple payload = new Payload.Simple(Collections.singletonMap("my_path", "not a map"));
117118
when(ctx.payload()).thenReturn(payload);
@@ -127,7 +128,7 @@ public void testThatSpecifiedPathIsNotCollection() {
127128

128129
public void testEmptyCollection() {
129130
ActionWrapper wrapper = new ActionWrapper("_action", null, InternalAlwaysCondition.INSTANCE, null, executableAction,
130-
"ctx.payload.my_path");
131+
"ctx.payload.my_path", null);
131132
WatchExecutionContext ctx = mockExecutionContent(watch);
132133
Payload.Simple payload = new Payload.Simple(Collections.singletonMap("my_path", Collections.emptyList()));
133134
when(ctx.payload()).thenReturn(payload);
@@ -143,7 +144,7 @@ public void testEmptyCollection() {
143144

144145
public void testPartialFailure() throws Exception {
145146
ActionWrapper wrapper = new ActionWrapper("_action", null, InternalAlwaysCondition.INSTANCE, null, executableAction,
146-
"ctx.payload.my_path");
147+
"ctx.payload.my_path", null);
147148
WatchExecutionContext ctx = mockExecutionContent(watch);
148149

149150
List<Map<String, String>> payloads = new ArrayList<>();
@@ -165,9 +166,9 @@ public void testPartialFailure() throws Exception {
165166
assertThat(result.action().status(), is(Action.Result.Status.PARTIAL_FAILURE));
166167
}
167168

168-
public void testLimitOfNumberOfActionsExecuted() throws Exception {
169+
public void testDefaultLimitOfNumberOfActionsExecuted() throws Exception {
169170
ActionWrapper wrapper = new ActionWrapper("_action", null, InternalAlwaysCondition.INSTANCE, null, executableAction,
170-
"ctx.payload.my_path");
171+
"ctx.payload.my_path", null);
171172
WatchExecutionContext ctx = mockExecutionContent(watch);
172173
List<Map<String, String>> itemsPayload = new ArrayList<>();
173174
for (int i = 0; i < 101; i++) {
@@ -193,11 +194,49 @@ public void testLimitOfNumberOfActionsExecuted() throws Exception {
193194
assertThat(map.get("foreach"), instanceOf(List.class));
194195
List<Map<String, Object>> actions = (List) map.get("foreach");
195196
assertThat(actions, hasSize(100));
197+
assertThat(map, hasKey("max_iterations"));
198+
assertThat(map.get("max_iterations"), is(100));
196199
assertThat(map, hasKey("number_of_actions_executed"));
197200
assertThat(map.get("number_of_actions_executed"), is(100));
198201
}
199202
}
200203

204+
public void testConfiguredLimitOfNumberOfActionsExecuted() throws Exception {
205+
int randomMaxIterations = randomIntBetween(1, 1000);
206+
ActionWrapper wrapper = new ActionWrapper("_action", null, InternalAlwaysCondition.INSTANCE, null, executableAction,
207+
"ctx.payload.my_path", randomMaxIterations);
208+
WatchExecutionContext ctx = mockExecutionContent(watch);
209+
List<Map<String, String>> itemsPayload = new ArrayList<>();
210+
for (int i = 0; i < randomMaxIterations + 1; i++) {
211+
final Action.Result actionResult = new LoggingAction.Result.Success("log_message " + i);;
212+
final Payload singleItemPayload = new Payload.Simple(ImmutableMap.of("key", String.valueOf(i)));
213+
itemsPayload.add(ImmutableMap.of("key", String.valueOf(i)));
214+
when(executableAction.execute(eq("_action"), eq(ctx), eq(singleItemPayload))).thenReturn(actionResult);
215+
}
216+
217+
Payload.Simple payload = new Payload.Simple(ImmutableMap.of("my_path", itemsPayload));
218+
when(ctx.payload()).thenReturn(payload);
219+
when(executableAction.logger()).thenReturn(logger);
220+
221+
ActionWrapperResult result = wrapper.execute(ctx);
222+
assertThat(result.action().status(), is(Action.Result.Status.SUCCESS));
223+
224+
// check that action toXContent contains all the results
225+
try (XContentBuilder builder = jsonBuilder()) {
226+
result.toXContent(builder, ToXContent.EMPTY_PARAMS);
227+
final String json = Strings.toString(builder);
228+
final Map<String, Object> map = XContentHelper.convertToMap(JsonXContent.jsonXContent, json, true);
229+
assertThat(map, hasKey("foreach"));
230+
assertThat(map.get("foreach"), instanceOf(List.class));
231+
List<Map<String, Object>> actions = (List) map.get("foreach");
232+
assertThat(actions, hasSize(randomMaxIterations));
233+
assertThat(map, hasKey("max_iterations"));
234+
assertThat(map.get("max_iterations"), is(randomMaxIterations));
235+
assertThat(map, hasKey("number_of_actions_executed"));
236+
assertThat(map.get("number_of_actions_executed"), is(randomMaxIterations));
237+
}
238+
}
239+
201240
private WatchExecutionContext mockExecutionContent(Watch watch) {
202241
WatchExecutionContext ctx = mock(WatchExecutionContext.class);
203242
when(watch.id()).thenReturn("watchId");

x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/execution/ExecutionServiceTests.java

+11-11
Original file line numberDiff line numberDiff line change
@@ -227,7 +227,7 @@ public void testExecute() throws Exception {
227227
when(action.type()).thenReturn("MY_AWESOME_TYPE");
228228
when(action.execute("_action", context, payload)).thenReturn(actionResult);
229229

230-
ActionWrapper actionWrapper = new ActionWrapper("_action", throttler, actionCondition, actionTransform, action, null);
230+
ActionWrapper actionWrapper = new ActionWrapper("_action", throttler, actionCondition, actionTransform, action, null, null);
231231

232232
WatchStatus watchStatus = new WatchStatus(now, singletonMap("_action", new ActionStatus(now)));
233233

@@ -313,7 +313,7 @@ public void testExecuteFailedInput() throws Exception {
313313
ExecutableAction action = mock(ExecutableAction.class);
314314
when(action.execute("_action", context, payload)).thenReturn(actionResult);
315315

316-
ActionWrapper actionWrapper = new ActionWrapper("_action", throttler, actionCondition, actionTransform, action, null);
316+
ActionWrapper actionWrapper = new ActionWrapper("_action", throttler, actionCondition, actionTransform, action, null, null);
317317
WatchStatus watchStatus = new WatchStatus(now, singletonMap("_action", new ActionStatus(now)));
318318

319319
when(watch.input()).thenReturn(input);
@@ -378,7 +378,7 @@ public void testExecuteFailedCondition() throws Exception {
378378
ExecutableAction action = mock(ExecutableAction.class);
379379
when(action.execute("_action", context, payload)).thenReturn(actionResult);
380380

381-
ActionWrapper actionWrapper = new ActionWrapper("_action", throttler, actionCondition, actionTransform, action, null);
381+
ActionWrapper actionWrapper = new ActionWrapper("_action", throttler, actionCondition, actionTransform, action, null, null);
382382
WatchStatus watchStatus = new WatchStatus(now, singletonMap("_action", new ActionStatus(now)));
383383

384384
when(watch.input()).thenReturn(input);
@@ -442,7 +442,7 @@ public void testExecuteFailedWatchTransform() throws Exception {
442442
ExecutableAction action = mock(ExecutableAction.class);
443443
when(action.execute("_action", context, payload)).thenReturn(actionResult);
444444

445-
ActionWrapper actionWrapper = new ActionWrapper("_action", throttler, actionCondition, actionTransform, action, null);
445+
ActionWrapper actionWrapper = new ActionWrapper("_action", throttler, actionCondition, actionTransform, action, null, null);
446446
WatchStatus watchStatus = new WatchStatus(now, singletonMap("_action", new ActionStatus(now)));
447447

448448
when(watch.input()).thenReturn(input);
@@ -520,7 +520,7 @@ public void testExecuteFailedActionTransform() throws Exception {
520520
when(action.logger()).thenReturn(logger);
521521
when(action.execute("_action", context, payload)).thenReturn(actionResult);
522522

523-
ActionWrapper actionWrapper = new ActionWrapper("_action", throttler, actionCondition, actionTransform, action, null);
523+
ActionWrapper actionWrapper = new ActionWrapper("_action", throttler, actionCondition, actionTransform, action, null, null);
524524

525525
WatchStatus watchStatus = new WatchStatus(now, singletonMap("_action", new ActionStatus(now)));
526526

@@ -600,7 +600,7 @@ public void testExecuteInner() throws Exception {
600600
ExecutableAction action = mock(ExecutableAction.class);
601601
when(action.execute("_action", context, payload)).thenReturn(actionResult);
602602

603-
ActionWrapper actionWrapper = new ActionWrapper("_action", throttler, actionCondition, actionTransform, action, null);
603+
ActionWrapper actionWrapper = new ActionWrapper("_action", throttler, actionCondition, actionTransform, action, null, null);
604604
ZonedDateTime time = clock.instant().atZone(ZoneOffset.UTC);
605605
WatchStatus watchStatus = new WatchStatus(time, singletonMap("_action", new ActionStatus(now)));
606606

@@ -649,7 +649,7 @@ public void testExecuteInnerThrottled() throws Exception {
649649

650650
ExecutableAction action = mock(ExecutableAction.class);
651651
when(action.type()).thenReturn("_type");
652-
ActionWrapper actionWrapper = new ActionWrapper("_action", throttler, actionCondition, actionTransform, action, null);
652+
ActionWrapper actionWrapper = new ActionWrapper("_action", throttler, actionCondition, actionTransform, action, null, null);
653653
ZonedDateTime time = clock.instant().atZone(ZoneOffset.UTC);
654654
WatchStatus watchStatus = new WatchStatus(time, singletonMap("_action", new ActionStatus(now)));
655655

@@ -712,7 +712,7 @@ public void testExecuteInnerConditionNotMet() throws Exception {
712712

713713
ExecutableAction action = mock(ExecutableAction.class);
714714
when(action.type()).thenReturn("_type");
715-
ActionWrapper actionWrapper = new ActionWrapper("_action", throttler, actionCondition, actionTransform, action, null);
715+
ActionWrapper actionWrapper = new ActionWrapper("_action", throttler, actionCondition, actionTransform, action, null, null);
716716
ZonedDateTime time = clock.instant().atZone(ZoneOffset.UTC);
717717
WatchStatus watchStatus = new WatchStatus(time, singletonMap("_action", new ActionStatus(now)));
718718

@@ -769,7 +769,7 @@ public void testExecuteInnerConditionNotMetDueToException() throws Exception {
769769
ExecutableAction action = mock(ExecutableAction.class);
770770
when(action.type()).thenReturn("_type");
771771
when(action.logger()).thenReturn(logger);
772-
ActionWrapper actionWrapper = new ActionWrapper("_action", throttler, actionCondition, actionTransform, action, null);
772+
ActionWrapper actionWrapper = new ActionWrapper("_action", throttler, actionCondition, actionTransform, action, null, null);
773773
ZonedDateTime time = clock.instant().atZone(ZoneOffset.UTC);
774774
WatchStatus watchStatus = new WatchStatus(time, singletonMap("_action", new ActionStatus(now)));
775775

@@ -817,7 +817,7 @@ public void testExecuteConditionNotMet() throws Exception {
817817
ExecutableCondition actionCondition = mock(ExecutableCondition.class);
818818
ExecutableTransform actionTransform = mock(ExecutableTransform.class);
819819
ExecutableAction action = mock(ExecutableAction.class);
820-
ActionWrapper actionWrapper = new ActionWrapper("_action", throttler, actionCondition, actionTransform, action, null);
820+
ActionWrapper actionWrapper = new ActionWrapper("_action", throttler, actionCondition, actionTransform, action, null, null);
821821

822822
ZonedDateTime time = clock.instant().atZone(ZoneOffset.UTC);
823823
WatchStatus watchStatus = new WatchStatus(time, singletonMap("_action", new ActionStatus(now)));
@@ -946,7 +946,7 @@ public void testThatTriggeredWatchDeletionHappensOnlyIfWatchExists() throws Exce
946946
when(action.type()).thenReturn("MY_AWESOME_TYPE");
947947
when(action.execute("_action", context, payload)).thenReturn(actionResult);
948948

949-
ActionWrapper actionWrapper = new ActionWrapper("_action", throttler, null, null, action, null);
949+
ActionWrapper actionWrapper = new ActionWrapper("_action", throttler, null, null, action, null, null);
950950

951951
WatchStatus watchStatus = new WatchStatus(now, singletonMap("_action", new ActionStatus(now)));
952952

0 commit comments

Comments
 (0)