Skip to content

Commit b03a794

Browse files
Support runtime fields on transforms (#5377) (#5394)
Co-authored-by: Steve Gordon <[email protected]>
1 parent 893449b commit b03a794

File tree

2 files changed

+178
-0
lines changed

2 files changed

+178
-0
lines changed

src/Nest/XPack/Transform/TransformSource.cs

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,12 @@ public interface ITransformSource
2323
/// </summary>
2424
[DataMember(Name = "query")]
2525
QueryContainer Query { get; set; }
26+
27+
/// <summary>
28+
/// Specifies runtime fields which exist only as part of the query.
29+
/// </summary>
30+
[DataMember(Name = "runtime_mappings")]
31+
IRuntimeFields RuntimeFields { get; set; }
2632
}
2733

2834
/// <inheritdoc />
@@ -34,13 +40,17 @@ public class TransformSource
3440

3541
/// <inheritdoc />
3642
public QueryContainer Query { get; set; }
43+
44+
/// <inheritdoc />
45+
public IRuntimeFields RuntimeFields { get; set; }
3746
}
3847

3948
/// <inheritdoc cref="ITransformSource" />
4049
public class TransformSourceDescriptor<T> : DescriptorBase<TransformSourceDescriptor<T>, ITransformSource>, ITransformSource where T : class
4150
{
4251
Indices ITransformSource.Index { get; set; }
4352
QueryContainer ITransformSource.Query { get; set; }
53+
IRuntimeFields ITransformSource.RuntimeFields { get; set; }
4454

4555
/// <inheritdoc cref="ITransformSource.Index" />
4656
public TransformSourceDescriptor<T> Index(Indices indices) => Assign(indices, (a, v) => a.Index = v);
@@ -52,5 +62,12 @@ public class TransformSourceDescriptor<T> : DescriptorBase<TransformSourceDescri
5262
public TransformSourceDescriptor<T> Query(Func<QueryContainerDescriptor<T>, QueryContainer> selector) =>
5363
Assign(selector, (a, v) => a.Query = v?.Invoke(new QueryContainerDescriptor<T>()));
5464

65+
/// <inheritdoc cref="ITransformSource.RuntimeFields" />
66+
public TransformSourceDescriptor<T> RuntimeFields(Func<RuntimeFieldsDescriptor<T>, IPromise<IRuntimeFields>> runtimeFieldsSelector) =>
67+
Assign(runtimeFieldsSelector, (a, v) => a.RuntimeFields = v?.Invoke(new RuntimeFieldsDescriptor<T>())?.Value);
68+
69+
/// <inheritdoc cref="ITransformSource.RuntimeFields" />
70+
public TransformSourceDescriptor<T> RuntimeFields<TSource>(Func<RuntimeFieldsDescriptor<TSource>, IPromise<IRuntimeFields>> runtimeFieldsSelector) where TSource : class =>
71+
Assign(runtimeFieldsSelector, (a, v) => a.RuntimeFields = v?.Invoke(new RuntimeFieldsDescriptor<TSource>())?.Value);
5572
}
5673
}

tests/Tests/XPack/Transform/TransformApiWithSettingsTests.cs

Lines changed: 161 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -311,4 +311,165 @@ protected override LazyResponses ClientUsage() =>
311311
.MaxPageSearchSize(200)
312312
.DocsPerSecond(200));
313313
}
314+
315+
[SkipVersion("<7.12.0", "Settings introduced in 7.12.0")]
316+
public class TransformApiWithRuntimeFieldsTests
317+
: ApiIntegrationTestBase<WritableCluster, PreviewTransformResponse<ProjectTransform>, IPreviewTransformRequest, PreviewTransformDescriptor<Project>, PreviewTransformRequest>
318+
{
319+
private const string RuntimeFieldName = "search_runtime_field";
320+
private const string RuntimeFieldScript = "if (doc['type'].size() != 0) {emit(doc['type'].value.toUpperCase())}";
321+
322+
public TransformApiWithRuntimeFieldsTests(WritableCluster cluster, EndpointUsage usage) : base(cluster, usage) { }
323+
324+
protected override LazyResponses ClientUsage() => Calls(
325+
(client, f) => client.Transform.Preview<Project, ProjectTransform>(f),
326+
(client, f) => client.Transform.PreviewAsync<Project, ProjectTransform>(f),
327+
(client, r) => client.Transform.Preview<ProjectTransform>(r),
328+
(client, r) => client.Transform.PreviewAsync<ProjectTransform>(r));
329+
330+
protected override HttpMethod HttpMethod => HttpMethod.POST;
331+
protected override string UrlPath => "_transform/_preview";
332+
protected override bool ExpectIsValid => true;
333+
protected override int ExpectStatusCode => 200;
334+
protected override bool SupportsDeserialization => false;
335+
336+
protected override object ExpectJson =>
337+
new
338+
{
339+
description = CallIsolatedValue,
340+
frequency = "1s",
341+
source = new {
342+
index = new[] { "project" },
343+
query = new { match_all = new { } },
344+
runtime_mappings = new
345+
{
346+
search_runtime_field = new
347+
{
348+
script = new
349+
{
350+
lang = "painless",
351+
source = RuntimeFieldScript
352+
},
353+
type = "keyword"
354+
}
355+
}
356+
},
357+
dest = new { index = $"transform-{CallIsolatedValue}" },
358+
pivot = new
359+
{
360+
aggregations = new
361+
{
362+
averageCommits = new
363+
{
364+
avg = new
365+
{
366+
field = "numberOfCommits"
367+
}
368+
},
369+
sumIntoMaster = new
370+
{
371+
scripted_metric = new
372+
{
373+
combine_script = new
374+
{
375+
source = "long sum = 0; for (s in state.masterCommits) { sum += s } return sum"
376+
},
377+
init_script = new
378+
{
379+
source = "state.masterCommits = []"
380+
},
381+
map_script = new
382+
{
383+
source = "state.masterCommits.add(doc['branches.keyword'].contains('master')? 1 : 0)"
384+
},
385+
reduce_script = new
386+
{
387+
source = "long sum = 0; for (s in states) { sum += s } return sum"
388+
}
389+
}
390+
}
391+
},
392+
group_by = new { type = new { terms = new { field = "search_runtime_field" } } }
393+
},
394+
sync = new
395+
{
396+
time = new
397+
{
398+
field = "lastActivity"
399+
}
400+
}
401+
};
402+
403+
protected override PreviewTransformRequest Initializer => new()
404+
{
405+
Description = CallIsolatedValue,
406+
Frequency = "1s",
407+
Source = new TransformSource { Index = Index<Project>(), Query = new MatchAllQuery(),
408+
RuntimeFields = new RuntimeFields
409+
{
410+
{ RuntimeFieldName, new RuntimeField
411+
{
412+
Type = FieldType.Keyword,
413+
Script = new PainlessScript(RuntimeFieldScript)
414+
}
415+
}
416+
}
417+
},
418+
Destination = new TransformDestination { Index = $"transform-{CallIsolatedValue}" },
419+
Pivot = new TransformPivot
420+
{
421+
Aggregations =
422+
new AverageAggregation("averageCommits", Field<Project>(f => f.NumberOfCommits)) &&
423+
new ScriptedMetricAggregation("sumIntoMaster")
424+
{
425+
InitScript = new InlineScript("state.masterCommits = []"),
426+
MapScript = new InlineScript("state.masterCommits.add(doc['branches.keyword'].contains('master')? 1 : 0)"),
427+
CombineScript = new InlineScript("long sum = 0; for (s in state.masterCommits) { sum += s } return sum"),
428+
ReduceScript = new InlineScript("long sum = 0; for (s in states) { sum += s } return sum")
429+
},
430+
GroupBy = new Dictionary<string, ISingleGroupSource>
431+
{
432+
{
433+
"type",
434+
new TermsGroupSource { Field = "search_runtime_field"}
435+
}
436+
}
437+
438+
},
439+
Sync = new TransformSyncContainer(new TransformTimeSync { Field = Field<Project>(f => f.LastActivity) })
440+
};
441+
442+
protected override Func<PreviewTransformDescriptor<Project>, IPreviewTransformRequest> Fluent => f => f
443+
.Description(CallIsolatedValue)
444+
.Frequency(new Time(1, TimeUnit.Second))
445+
.Source(s => s
446+
.Index<Project>()
447+
.Query(q => q.MatchAll())
448+
.RuntimeFields(rtf => rtf.RuntimeField(RuntimeFieldName, FieldType.Keyword, r => r.Script(RuntimeFieldScript)))
449+
)
450+
.Destination(de => de
451+
.Index($"transform-{CallIsolatedValue}")
452+
)
453+
.Pivot(p => p
454+
.Aggregations(a => a
455+
.Average("averageCommits", avg => avg
456+
.Field(fld => fld.NumberOfCommits)
457+
)
458+
.ScriptedMetric("sumIntoMaster", sm => sm
459+
.InitScript("state.masterCommits = []")
460+
.MapScript("state.masterCommits.add(doc['branches.keyword'].contains('master')? 1 : 0)")
461+
.CombineScript("long sum = 0; for (s in state.masterCommits) { sum += s } return sum")
462+
.ReduceScript("long sum = 0; for (s in states) { sum += s } return sum")
463+
)
464+
)
465+
.GroupBy(g => g
466+
.Terms("type", t => t.Field("search_runtime_field"))
467+
)
468+
)
469+
.Sync(sy => sy
470+
.Time(t => t
471+
.Field(fld => fld.LastActivity)
472+
)
473+
);
474+
}
314475
}

0 commit comments

Comments
 (0)