Skip to content

Commit 42e5536

Browse files
committed
V65/processors (#3528)
* Add support for the dissect ingest processor * Add support for if/tag and ignore_failure on all processors fixes #3506 * Add support for the drop processor * Add support for the set_security_user processor * Add support for the pipeline processor * addressed PR comments regarding xmldocs and TODO statements (cherry picked from commit 5e7e391)
1 parent c11a6e3 commit 42e5536

File tree

8 files changed

+368
-2
lines changed

8 files changed

+368
-2
lines changed

src/Nest/Ingest/PipelineJsonConverter.cs

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -110,6 +110,18 @@ private List<IProcessor> GetProcessors(JToken jsonProcessors, JsonSerializer ser
110110
case "bytes":
111111
processors.Add(jsonProcessor.ToObject<BytesProcessor>(serializer));
112112
break;
113+
case "drop":
114+
processors.Add(jsonProcessor.ToObject<DropProcessor>(serializer));
115+
break;
116+
case "dissect":
117+
processors.Add(jsonProcessor.ToObject<DissectProcessor>(serializer));
118+
break;
119+
case "set_security_user":
120+
processors.Add(jsonProcessor.ToObject<SetSecurityUserProcessor>(serializer));
121+
break;
122+
case PipelineProcessor.ProcessorTypeName:
123+
processors.Add(jsonProcessor.ToObject<PipelineProcessor>(serializer));
124+
break;
113125
}
114126
}
115127
return processors;

src/Nest/Ingest/Processor.cs

Lines changed: 51 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,21 +4,56 @@
44

55
namespace Nest
66
{
7+
/// <summary> Ingest pipelines are composed of one or more processors </summary>
78
public interface IProcessor
89
{
10+
// TODO: even though this property is ignored it has a JsonProperty because our GetCachedProperties helper prefers
11+
// this property over the differently named subclass property that has JsonProperty("name"). Needs fixing outside the
12+
// scope of the current branch and warrants deeper investigation. Hence the current hack of __ignored__
13+
/// <summary> The name of the processor, will be used as the key when persisting the processor on the pipeline </summary>
14+
[JsonIgnore, JsonProperty("__ignored__")]
915
string Name { get; }
1016

17+
/// <summary>
18+
/// If a processor fails, call these processors instead. Read more about handling failures here:
19+
/// https://www.elastic.co/guide/en/elasticsearch/reference/current/handling-failure-in-pipelines.html
20+
/// </summary>
1121
[JsonProperty("on_failure")]
1222
IEnumerable<IProcessor> OnFailure { get; set; }
23+
24+
/// <summary> A painless script predicate that can control whether this processor should be executed or not </summary>
25+
[JsonProperty("if")]
26+
string If { get; set; }
27+
28+
/// <summary>
29+
/// A tag is simply a string identifier of the specific instantiation of a certain processor
30+
/// in a pipeline. The tag field does not affect the processor’s behavior, but is very useful
31+
/// for bookkeeping and tracing errors to specific processors.
32+
/// </summary>
33+
[JsonProperty("tag")]
34+
string Tag { get; set; }
35+
36+
/// <summary> When a failure happens, ignore it and proceed to the next processor </summary>
37+
[JsonProperty("ignore_failue")]
38+
bool? IgnoreFailure { get; set; }
1339
}
1440

41+
/// <inheritdoc cref="IProcessor"/>
1542
public abstract class ProcessorBase : IProcessor
1643
{
44+
/// <inheritdoc cref="IProcessor.If"/>
45+
public string If { get; set; }
46+
/// <inheritdoc cref="IProcessor.Tag"/>
47+
public string Tag { get; set; }
48+
/// <inheritdoc cref="IProcessor.IgnoreFailure"/>
49+
public bool? IgnoreFailure { get; set; }
50+
/// <inheritdoc cref="IProcessor.OnFailure"/>
1751
public IEnumerable<IProcessor> OnFailure { get; set; }
1852
protected abstract string Name { get; }
1953
string IProcessor.Name => Name;
2054
}
2155

56+
/// <inheritdoc cref="IProcessor"/>
2257
public abstract class ProcessorDescriptorBase<TProcessorDescriptor, TProcessorInterface>
2358
: DescriptorBase<TProcessorDescriptor, TProcessorInterface>, IProcessor
2459
where TProcessorDescriptor : ProcessorDescriptorBase<TProcessorDescriptor, TProcessorInterface>, TProcessorInterface
@@ -27,12 +62,26 @@ public abstract class ProcessorDescriptorBase<TProcessorDescriptor, TProcessorIn
2762
protected abstract string Name { get; }
2863
string IProcessor.Name => Name;
2964
IEnumerable<IProcessor> IProcessor.OnFailure { get; set; }
65+
string IProcessor.If { get; set; }
66+
string IProcessor.Tag { get; set; }
67+
bool? IProcessor.IgnoreFailure { get; set; }
3068

31-
/// <inheritdoc />
69+
/// <inheritdoc cref="IProcessor.OnFailure"/>
3270
public TProcessorDescriptor OnFailure(IEnumerable<IProcessor> processors) => Assign(a => a.OnFailure = processors.ToListOrNullIfEmpty());
3371

34-
/// <inheritdoc />
72+
/// <inheritdoc cref="IProcessor.OnFailure"/>
3573
public TProcessorDescriptor OnFailure(Func<ProcessorsDescriptor, IPromise<IList<IProcessor>>> selector) =>
3674
Assign(a => a.OnFailure = selector?.Invoke(new ProcessorsDescriptor())?.Value);
75+
76+
/// <inheritdoc cref="IProcessor.If"/>
77+
public TProcessorDescriptor If(string painlessPredicate) => Assign(a => a.If = painlessPredicate);
78+
79+
/// <inheritdoc cref="IProcessor.Tag"/>
80+
public TProcessorDescriptor Tag(string tag) => Assign(a => a.Tag = tag);
81+
82+
/// <inheritdoc cref="IProcessor.IgnoreFailure"/>
83+
public TProcessorDescriptor IgnoreFailure(bool? ignoreFailure = true) => Assign(a => a.IgnoreFailure = ignoreFailure);
84+
3785
}
86+
3887
}
Lines changed: 86 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,86 @@
1+
using System;
2+
using System.Collections.Generic;
3+
using System.Linq.Expressions;
4+
using Newtonsoft.Json;
5+
6+
namespace Nest
7+
{
8+
/// <summary>
9+
/// Similar to the Grok Processor, dissect also extracts structured fields out of a single text field
10+
/// within a document. However unlike the Grok Processor, dissect does not use Regular Expressions.
11+
/// This allows dissect’s syntax to be simple and, for some cases faster, than the Grok Processor.
12+
/// </summary>
13+
[JsonObject(MemberSerialization.OptIn)]
14+
[JsonConverter(typeof(ProcessorJsonConverter<DissectProcessor>))]
15+
public interface IDissectProcessor : IProcessor
16+
{
17+
/// <summary> The field to dissect </summary>
18+
[JsonProperty("field")]
19+
Field Field { get; set; }
20+
21+
/// <summary> The pattern to apply to the field </summary>
22+
[JsonProperty("pattern")]
23+
string Pattern { get; set; }
24+
25+
/// <summary
26+
/// If true and field does not exist or is null, the processor quietly exits without modifying the document>
27+
/// </summary>
28+
[JsonProperty("ignore_missing")]
29+
bool? IgnoreMissing { get; set; }
30+
31+
/// <summary> The character(s) that separate the appended fields. </summary>
32+
[JsonProperty("append_separator")]
33+
string AppendSeparator { get; set; }
34+
}
35+
36+
/// <inheritdoc cref="IDissectProcessor"/>
37+
public class DissectProcessor : ProcessorBase, IDissectProcessor
38+
{
39+
protected override string Name => "dissect";
40+
41+
/// <inheritdoc cref="IDissectProcessor.Field">
42+
public Field Field { get; set; }
43+
44+
/// <inheritdoc cref="IDissectProcessor.Pattern">
45+
public string Pattern { get; set; }
46+
47+
/// <inheritdoc cref="IDissectProcessor.IgnoreMissing">
48+
public bool? IgnoreMissing { get; set; }
49+
50+
/// <inheritdoc cref="IDissectProcessor.AppendSeparator">
51+
public string AppendSeparator { get; set; }
52+
}
53+
54+
/// <inheritdoc cref="IDissectProcessor"/>
55+
public class DissectProcessorDescriptor<T>
56+
: ProcessorDescriptorBase<DissectProcessorDescriptor<T>, IDissectProcessor>, IDissectProcessor
57+
where T : class
58+
{
59+
protected override string Name => "dissect";
60+
61+
Field IDissectProcessor.Field { get; set; }
62+
string IDissectProcessor.Pattern { get; set; }
63+
bool? IDissectProcessor.IgnoreMissing { get; set; }
64+
string IDissectProcessor.AppendSeparator { get; set; }
65+
66+
/// <inheritdoc cref="IDissectProcessor.Field">
67+
public DissectProcessorDescriptor<T> Field(Field field) => Assign(a => a.Field = field);
68+
69+
/// <inheritdoc cref="IDissectProcessor.Field">
70+
public DissectProcessorDescriptor<T> Field(Expression<Func<T, object>> objectPath) =>
71+
Assign(a => a.Field = objectPath);
72+
73+
/// <inheritdoc cref="IDissectProcessor.Pattern">
74+
public DissectProcessorDescriptor<T> Pattern(string pattern) =>
75+
Assign(a => a.Pattern = pattern);
76+
77+
/// <inheritdoc cref="IDissectProcessor.IgnoreMissing">
78+
public DissectProcessorDescriptor<T> IgnoreMissing(bool? traceMatch = true) =>
79+
Assign(a => a.IgnoreMissing = traceMatch);
80+
81+
/// <inheritdoc cref="IDissectProcessor.AppendSeparator">
82+
public DissectProcessorDescriptor<T> AppendSeparator(string appendSeparator) =>
83+
Assign(a => a.AppendSeparator = appendSeparator);
84+
85+
}
86+
}
Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
using System;
2+
using System.Linq.Expressions;
3+
using Newtonsoft.Json;
4+
5+
namespace Nest
6+
{
7+
/// <summary>
8+
/// Drops the document without raising any errors. This is useful to prevent the document from getting indexed based on some condition.
9+
/// </summary>
10+
[JsonObject(MemberSerialization.OptIn)]
11+
[JsonConverter(typeof(ProcessorJsonConverter<DropProcessor>))]
12+
public interface IDropProcessor : IProcessor
13+
{
14+
}
15+
16+
/// <inheritdoc cref="IDropProcessor"/>
17+
public class DropProcessor : ProcessorBase, IDropProcessor
18+
{
19+
protected override string Name => "drop";
20+
}
21+
22+
/// <inheritdoc cref="IDropProcessor"/>
23+
public class DropProcessorDescriptor : ProcessorDescriptorBase<DropProcessorDescriptor, IDropProcessor>, IDropProcessor
24+
{
25+
protected override string Name => "drop";
26+
}
27+
}
Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
using System;
2+
using System.Linq.Expressions;
3+
using Newtonsoft.Json;
4+
5+
namespace Nest
6+
{
7+
/// <summary> Executes another pipeline.</summary>
8+
[JsonObject(MemberSerialization.OptIn)]
9+
[JsonConverter(typeof(ProcessorJsonConverter<PipelineProcessor>))]
10+
public interface IPipelineProcessor : IProcessor
11+
{
12+
//TODO 7.x: this property clashes with the Name property on the IProcessor, need to rename base in master
13+
/// <summary>The name of the pipeline to execute. </summary>
14+
[JsonProperty("name")]
15+
string ProcessorName { get; set; }
16+
}
17+
18+
/// <inheritdoc cref="IPipelineProcessor" />
19+
public class PipelineProcessor : ProcessorBase, IPipelineProcessor
20+
{
21+
/// <inheritdoc cref="IPipelineProcessor.ProcessorName"/>
22+
[JsonProperty("name")]
23+
public string ProcessorName { get; set; }
24+
25+
internal const string ProcessorTypeName = "pipeline";
26+
protected override string Name => ProcessorTypeName;
27+
}
28+
29+
/// <inheritdoc cref="IPipelineProcessor" />
30+
public class PipelineProcessorDescriptor
31+
: ProcessorDescriptorBase<PipelineProcessorDescriptor, IPipelineProcessor>, IPipelineProcessor
32+
{
33+
protected override string Name => PipelineProcessor.ProcessorTypeName;
34+
string IPipelineProcessor.ProcessorName { get; set; }
35+
36+
/// <inheritdoc cref="IPipelineProcessor.ProcessorName"/>
37+
public PipelineProcessorDescriptor ProcessorName(string processorName) => Assign(a => a.ProcessorName = processorName);
38+
}
39+
}
Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,44 @@
1+
using System;
2+
using System.Linq.Expressions;
3+
using Newtonsoft.Json;
4+
5+
namespace Nest
6+
{
7+
/// <summary>
8+
/// Sets user-related details (such as username, roles, email, full_name and metadata ) from the
9+
/// current authenticated user to the current document by pre-processing the ingest.
10+
/// </summary>
11+
[JsonObject(MemberSerialization.OptIn)]
12+
[JsonConverter(typeof(ProcessorJsonConverter<SetSecurityUserProcessor>))]
13+
public interface ISetSecurityUserProcessor : IProcessor
14+
{
15+
/// <summary>The field to store the user information into. </summary>
16+
[JsonProperty("field")]
17+
Field Field { get; set; }
18+
}
19+
20+
21+
/// <inheritdoc cref="ISetSecurityUserProcessor" />
22+
public class SetSecurityUserProcessor : ProcessorBase, ISetSecurityUserProcessor
23+
{
24+
/// <inheritdoc cref="ISetSecurityUserProcessor.Field"/>
25+
public Field Field { get; set; }
26+
protected override string Name => "set_security_user";
27+
}
28+
29+
/// <inheritdoc cref="ISetSecurityUserProcessor" />
30+
public class SetSecurityUserProcessorDescriptor<T>
31+
: ProcessorDescriptorBase<SetSecurityUserProcessorDescriptor<T>, ISetSecurityUserProcessor>, ISetSecurityUserProcessor
32+
where T : class
33+
{
34+
protected override string Name => "set_security_user";
35+
Field ISetSecurityUserProcessor.Field { get; set; }
36+
37+
/// <inheritdoc cref="ISetSecurityUserProcessor.Field"/>
38+
public SetSecurityUserProcessorDescriptor<T> Field(Field field) => Assign(a => a.Field = field);
39+
40+
/// <inheritdoc cref="ISetSecurityUserProcessor.Field"/>
41+
public SetSecurityUserProcessorDescriptor<T> Field(Expression<Func<T, object>> objectPath) =>
42+
Assign(a => a.Field = objectPath);
43+
}
44+
}

src/Nest/Ingest/ProcessorsDescriptor.cs

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -141,5 +141,22 @@ public ProcessorsDescriptor UrlDecode<T>(Func<UrlDecodeProcessorDescriptor<T>, I
141141
/// <inheritdoc cref="IBytesProcessor" />
142142
public ProcessorsDescriptor Bytes<T>(Func<BytesProcessorDescriptor<T>, IBytesProcessor> selector) where T : class =>
143143
Assign(a => a.AddIfNotNull(selector?.Invoke(new BytesProcessorDescriptor<T>())));
144+
145+
/// <inheritdoc cref="IDissectProcessor" />
146+
public ProcessorsDescriptor Dissect<T>(Func<DissectProcessorDescriptor<T>, IDissectProcessor> selector) where T : class =>
147+
Assign(a => a.AddIfNotNull(selector?.Invoke(new DissectProcessorDescriptor<T>())));
148+
149+
/// <inheritdoc cref="IDropProcessor" />
150+
public ProcessorsDescriptor Drop(Func<DropProcessorDescriptor, IDropProcessor> selector) =>
151+
Assign(a => a.AddIfNotNull(selector?.Invoke(new DropProcessorDescriptor())));
152+
153+
/// <inheritdoc cref="ISetSecurityUserProcessor" />
154+
public ProcessorsDescriptor SetSecurityUser<T>(Func<SetSecurityUserProcessorDescriptor<T>, ISetSecurityUserProcessor> selector) where T : class =>
155+
Assign(a => a.AddIfNotNull(selector?.Invoke(new SetSecurityUserProcessorDescriptor<T>())));
156+
157+
/// <inheritdoc cref="IPipelineProcessor" />
158+
public ProcessorsDescriptor Pipeline(Func<PipelineProcessorDescriptor, IPipelineProcessor> selector) =>
159+
Assign(a => a.AddIfNotNull(selector?.Invoke(new PipelineProcessorDescriptor())));
160+
144161
}
145162
}

0 commit comments

Comments
 (0)