Skip to content

Commit 035d1ef

Browse files
committed
Allow execution of each Watcher action in array (#4068)
* Allow execution of each Watcher action in array Relates: #4001 This commit introduces Foreach on Watcher actions, to allow execution of the Watcher action on each element in an array specified by the path assigned to foreach. Add condition to Watcher actions and ensure condition and transform are serialized. Add integration test for Action foreach, transform and condition. (cherry picked from commit 8acde7e)
1 parent 5387851 commit 035d1ef

File tree

3 files changed

+258
-2
lines changed

3 files changed

+258
-2
lines changed

src/Nest/XPack/Watcher/Action/ActionBase.cs

+89-2
Original file line numberDiff line numberDiff line change
@@ -7,22 +7,57 @@
77

88
namespace Nest
99
{
10+
/// <summary>
11+
/// A Watcher action
12+
/// </summary>
1013
[InterfaceDataContract]
1114
public interface IAction
1215
{
16+
/// <summary>
17+
/// The type of action
18+
/// </summary>
1319
[IgnoreDataMember]
1420
ActionType ActionType { get; }
1521

22+
/// <summary>
23+
/// The name of the action
24+
/// </summary>
1625
[IgnoreDataMember]
1726
string Name { get; set; }
1827

28+
/// <summary>
29+
/// Limit how often an action is executed, after it has been executed.
30+
/// When a throttling period is set, repeated executions of the action are prevented if it has already
31+
/// executed within the throttling period time frame (now - throttling period).
32+
/// </summary>
1933
[IgnoreDataMember]
2034
Time ThrottlePeriod { get; set; }
2135

22-
[DataMember(Name = "transform")]
36+
/// <summary>
37+
/// Trigger the configured action for every element within an array
38+
/// defined by the path assigned.
39+
/// <para />
40+
/// Valid only in Elasticsearch 7.3.0+
41+
/// </summary>
42+
[IgnoreDataMember]
43+
string Foreach { get; set; }
44+
45+
/// <summary>
46+
/// Transforms the payload before executing the action. The transformation is only applied
47+
/// for the payload for this action.
48+
/// </summary>
49+
[IgnoreDataMember]
2350
TransformContainer Transform { get; set; }
51+
52+
/// <summary>
53+
/// A condition for the action. Allows a single watch to specify multiple actions, but
54+
/// further control when each action will be executed.
55+
/// </summary>
56+
[IgnoreDataMember]
57+
ConditionContainer Condition { get; set; }
2458
}
2559

60+
/// <inheritdoc />
2661
public abstract class ActionBase : IAction
2762
{
2863
internal ActionBase() { }
@@ -31,12 +66,21 @@ internal ActionBase() { }
3166

3267
public abstract ActionType ActionType { get; }
3368

69+
/// <inheritdoc />
3470
public string Name { get; set; }
3571

72+
/// <inheritdoc />
3673
public Time ThrottlePeriod { get; set; }
3774

75+
/// <inheritdoc />
76+
public string Foreach { get; set; }
77+
78+
/// <inheritdoc />
3879
public TransformContainer Transform { get; set; }
3980

81+
/// <inheritdoc />
82+
public ConditionContainer Condition { get; set; }
83+
4084
public static bool operator false(ActionBase a) => false;
4185

4286
public static bool operator true(ActionBase a) => false;
@@ -78,7 +122,10 @@ internal class ActionsFormatter : IJsonFormatter<Actions>
78122
{ "index", 3 },
79123
{ "logging", 4 },
80124
{ "slack", 5 },
81-
{ "pagerduty", 6 }
125+
{ "pagerduty", 6 },
126+
{ "foreach", 7 },
127+
{ "transform", 8 },
128+
{ "condition", 9 }
82129
};
83130

84131
public Actions Deserialize(ref JsonReader reader, IJsonFormatterResolver formatterResolver)
@@ -92,6 +139,10 @@ public Actions Deserialize(ref JsonReader reader, IJsonFormatterResolver formatt
92139

93140
Time throttlePeriod = null;
94141
IAction action = null;
142+
string @foreach = null;
143+
TransformContainer transform = null;
144+
ConditionContainer condition = null;
145+
95146
while (reader.ReadIsInObject(ref actionCount))
96147
{
97148
var propertyName = reader.ReadPropertyNameSegmentRaw();
@@ -128,6 +179,17 @@ public Actions Deserialize(ref JsonReader reader, IJsonFormatterResolver formatt
128179
action = formatterResolver.GetFormatter<PagerDutyAction>()
129180
.Deserialize(ref reader, formatterResolver);
130181
break;
182+
case 7:
183+
@foreach = reader.ReadString();
184+
break;
185+
case 8:
186+
transform = formatterResolver.GetFormatter<TransformContainer>()
187+
.Deserialize(ref reader, formatterResolver);
188+
break;
189+
case 9:
190+
condition = formatterResolver.GetFormatter<ConditionContainer>()
191+
.Deserialize(ref reader, formatterResolver);
192+
break;
131193
}
132194
}
133195
else
@@ -138,6 +200,9 @@ public Actions Deserialize(ref JsonReader reader, IJsonFormatterResolver formatt
138200
{
139201
action.Name = name;
140202
action.ThrottlePeriod = throttlePeriod;
203+
action.Foreach = @foreach;
204+
action.Transform = transform;
205+
action.Condition = condition;
141206
dictionary.Add(name, action);
142207
}
143208
}
@@ -166,6 +231,28 @@ public void Serialize(ref JsonWriter writer, Actions value, IJsonFormatterResolv
166231
timeFormatter.Serialize(ref writer, action.ThrottlePeriod, formatterResolver);
167232
writer.WriteValueSeparator();
168233
}
234+
235+
if (!string.IsNullOrEmpty(action.Foreach))
236+
{
237+
writer.WritePropertyName("foreach");
238+
writer.WriteString(action.Foreach);
239+
writer.WriteValueSeparator();
240+
}
241+
242+
if (action.Transform != null)
243+
{
244+
writer.WritePropertyName("transform");
245+
formatterResolver.GetFormatter<TransformContainer>().Serialize(ref writer, action.Transform, formatterResolver);
246+
writer.WriteValueSeparator();
247+
}
248+
249+
if (action.Condition != null)
250+
{
251+
writer.WritePropertyName("condition");
252+
formatterResolver.GetFormatter<ConditionContainer>().Serialize(ref writer, action.Condition, formatterResolver);
253+
writer.WriteValueSeparator();
254+
}
255+
169256
writer.WritePropertyName(kvp.Value.ActionType.GetStringValue());
170257

171258
switch (action.ActionType)

src/Nest/XPack/Watcher/Action/ActionsDescriptorBase.cs

+12
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22

33
namespace Nest
44
{
5+
/// <inheritdoc cref="IAction"/>
56
public abstract class ActionsDescriptorBase<TDescriptor, TInterface>
67
: DescriptorBase<TDescriptor, TInterface>, IAction
78
where TDescriptor : DescriptorBase<TDescriptor, TInterface>, TInterface
@@ -29,10 +30,21 @@ string IAction.Name
2930

3031
Time IAction.ThrottlePeriod { get; set; }
3132
TransformContainer IAction.Transform { get; set; }
33+
ConditionContainer IAction.Condition { get; set; }
34+
string IAction.Foreach { get; set; }
3235

36+
/// <inheritdoc cref="IAction.Transform"/>
3337
public TDescriptor Transform(Func<TransformDescriptor, TransformContainer> selector) =>
3438
Assign(selector.InvokeOrDefault(new TransformDescriptor()), (a, v) => a.Transform = v);
3539

40+
/// <inheritdoc cref="IAction.Condition"/>
41+
public TDescriptor Condition(Func<ConditionDescriptor, ConditionContainer> selector) =>
42+
Assign(selector.InvokeOrDefault(new ConditionDescriptor()), (a, v) => a.Condition = v);
43+
44+
/// <inheritdoc cref="IAction.ThrottlePeriod"/>
3645
public TDescriptor ThrottlePeriod(Time throttlePeriod) => Assign(throttlePeriod, (a, v) => a.ThrottlePeriod = v);
46+
47+
/// <inheritdoc cref="IAction.Foreach"/>
48+
public TDescriptor Foreach(string @foreach) => Assign(@foreach, (a, v) => a.Foreach = v);
3749
}
3850
}

src/Tests/Tests/XPack/Watcher/PutWatch/PutWatchApiTests.cs

+157
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
using System;
22
using System.Collections.Generic;
33
using System.Linq;
4+
using Elastic.Xunit.XunitPlumbing;
45
using Elasticsearch.Net;
56
using FluentAssertions;
67
using Nest;
@@ -710,4 +711,160 @@ protected override void ExpectResponse(PutWatchResponse response)
710711
response.Id.Should().Be(CallIsolatedValue);
711712
}
712713
}
714+
715+
[SkipVersion("<7.3.0", "Foreach introduced in 7.3.0")]
716+
public class PutWatchApiWithForeachTests : ApiIntegrationTestBase<XPackCluster, PutWatchResponse, IPutWatchRequest, PutWatchDescriptor, PutWatchRequest>
717+
{
718+
public PutWatchApiWithForeachTests(XPackCluster cluster, EndpointUsage usage) : base(cluster, usage) { }
719+
720+
protected override bool ExpectIsValid => true;
721+
722+
protected override object ExpectJson =>
723+
new
724+
{
725+
input = new
726+
{
727+
search = new
728+
{
729+
request = new
730+
{
731+
indices = new[] { "project" },
732+
body = new
733+
{
734+
query = new
735+
{
736+
range = new
737+
{
738+
numberOfCommits = new
739+
{
740+
gt = 10.0
741+
}
742+
}
743+
}
744+
}
745+
}
746+
}
747+
},
748+
trigger = new
749+
{
750+
schedule = new
751+
{
752+
interval = "5m"
753+
}
754+
},
755+
actions = new
756+
{
757+
log_hits = new
758+
{
759+
@foreach = "ctx.payload.hits.hits",
760+
logging = new
761+
{
762+
text = "Found id {{ctx.payload._id}} with field {{ctx.payload._source.numberOfCommits}}"
763+
},
764+
transform = new
765+
{
766+
script = new
767+
{
768+
source = "return [ 'time' : ctx.trigger.scheduled_time ]"
769+
}
770+
},
771+
condition = new
772+
{
773+
always = new {}
774+
}
775+
}
776+
}
777+
};
778+
779+
protected override int ExpectStatusCode => 201;
780+
781+
protected override HttpMethod HttpMethod => HttpMethod.PUT;
782+
783+
protected override Func<PutWatchDescriptor, IPutWatchRequest> Fluent => p => p
784+
.Input(i => i
785+
.Search(s => s
786+
.Request(si => si
787+
.Indices<Project>()
788+
.Body<Project>(b => b
789+
.Query(q => q
790+
.Range(r => r
791+
.Field(f => f.NumberOfCommits)
792+
.GreaterThan(10)
793+
)
794+
)
795+
)
796+
)
797+
)
798+
)
799+
.Trigger(t => t
800+
.Schedule(s => s
801+
.Interval(new Interval(5, IntervalUnit.Minute))
802+
)
803+
)
804+
.Actions(a => a
805+
.Logging("log_hits", i => i
806+
.Foreach("ctx.payload.hits.hits")
807+
.Text("Found id {{ctx.payload._id}} with field {{ctx.payload._source.numberOfCommits}}")
808+
.Transform(t => t
809+
.Script(st =>st
810+
.Source("return [ 'time' : ctx.trigger.scheduled_time ]")
811+
)
812+
)
813+
.Condition(c => c
814+
.Always()
815+
)
816+
)
817+
);
818+
819+
protected override PutWatchRequest Initializer =>
820+
new PutWatchRequest(CallIsolatedValue)
821+
{
822+
Input = new SearchInput
823+
{
824+
Request = new SearchInputRequest
825+
{
826+
Indices = new IndexName[] { typeof(Project) },
827+
Body = new SearchRequest<Project>
828+
{
829+
Query = new NumericRangeQuery
830+
{
831+
Field = Infer.Field<Project>(f => f.NumberOfCommits),
832+
GreaterThan = 10
833+
}
834+
}
835+
}
836+
},
837+
Trigger = new ScheduleContainer
838+
{
839+
Interval = new Interval(5, IntervalUnit.Minute)
840+
},
841+
Actions = new LoggingAction("log_hits")
842+
{
843+
Foreach = "ctx.payload.hits.hits",
844+
Text = "Found id {{ctx.payload._id}} with field {{ctx.payload._source.numberOfCommits}}",
845+
Transform = new InlineScriptTransform("return [ 'time' : ctx.trigger.scheduled_time ]"),
846+
Condition = new AlwaysCondition()
847+
}
848+
};
849+
850+
protected override bool SupportsDeserialization => false;
851+
852+
protected override string UrlPath => $"/_watcher/watch/{CallIsolatedValue}";
853+
854+
protected override LazyResponses ClientUsage() => Calls(
855+
(client, f) => client.Watcher.Put(CallIsolatedValue, f),
856+
(client, f) => client.Watcher.PutAsync(CallIsolatedValue, f),
857+
(client, r) => client.Watcher.Put(r),
858+
(client, r) => client.Watcher.PutAsync(r)
859+
);
860+
861+
protected override PutWatchDescriptor NewDescriptor() => new PutWatchDescriptor(CallIsolatedValue);
862+
863+
protected override void ExpectResponse(PutWatchResponse response)
864+
{
865+
response.Created.Should().BeTrue();
866+
response.Version.Should().Be(1);
867+
response.Id.Should().Be(CallIsolatedValue);
868+
}
869+
}
713870
}

0 commit comments

Comments
 (0)