Skip to content

V65/processors #3528

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 6 commits into from
Jan 24, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions src/Nest/Ingest/PipelineJsonConverter.cs
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,18 @@ private List<IProcessor> GetProcessors(JToken jsonProcessors, JsonSerializer ser
case "bytes":
processors.Add(jsonProcessor.ToObject<BytesProcessor>(serializer));
break;
case "drop":
processors.Add(jsonProcessor.ToObject<DropProcessor>(serializer));
break;
case "dissect":
processors.Add(jsonProcessor.ToObject<DissectProcessor>(serializer));
break;
case "set_security_user":
processors.Add(jsonProcessor.ToObject<SetSecurityUserProcessor>(serializer));
break;
case PipelineProcessor.ProcessorTypeName:
processors.Add(jsonProcessor.ToObject<PipelineProcessor>(serializer));
break;
}
}
return processors;
Expand Down
53 changes: 51 additions & 2 deletions src/Nest/Ingest/Processor.cs
Original file line number Diff line number Diff line change
Expand Up @@ -4,21 +4,56 @@

namespace Nest
{
/// <summary> Ingest pipelines are composed of one or more processors </summary>
public interface IProcessor
{
// TODO: even though this property is ignored it has a JsonProperty because our GetCachedProperties helper prefers
// this property over the differently named subclass property that has JsonProperty("name"). Needs fixing outside the
// scope of the current branch and warrants deeper investigation. Hence the current hack of __ignored__
/// <summary> The name of the processor, will be used as the key when persisting the processor on the pipeline </summary>
[JsonIgnore, JsonProperty("__ignored__")]
string Name { get; }

/// <summary>
/// If a processor fails, call these processors instead. Read more about handling failures here:
/// https://www.elastic.co/guide/en/elasticsearch/reference/current/handling-failure-in-pipelines.html
/// </summary>
[JsonProperty("on_failure")]
IEnumerable<IProcessor> OnFailure { get; set; }

/// <summary> A painless script predicate that can control whether this processor should be executed or not </summary>
[JsonProperty("if")]
string If { get; set; }

/// <summary>
/// A tag is simply a string identifier of the specific instantiation of a certain processor
/// in a pipeline. The tag field does not affect the processor’s behavior, but is very useful
/// for bookkeeping and tracing errors to specific processors.
/// </summary>
[JsonProperty("tag")]
string Tag { get; set; }

/// <summary> When a failure happens, ignore it and proceed to the next processor </summary>
[JsonProperty("ignore_failue")]
bool? IgnoreFailure { get; set; }
}

/// <inheritdoc cref="IProcessor"/>
public abstract class ProcessorBase : IProcessor
{
/// <inheritdoc cref="IProcessor.If"/>
public string If { get; set; }
/// <inheritdoc cref="IProcessor.Tag"/>
public string Tag { get; set; }
/// <inheritdoc cref="IProcessor.IgnoreFailure"/>
public bool? IgnoreFailure { get; set; }
/// <inheritdoc cref="IProcessor.OnFailure"/>
public IEnumerable<IProcessor> OnFailure { get; set; }
protected abstract string Name { get; }
string IProcessor.Name => Name;
}

/// <inheritdoc cref="IProcessor"/>
public abstract class ProcessorDescriptorBase<TProcessorDescriptor, TProcessorInterface>
: DescriptorBase<TProcessorDescriptor, TProcessorInterface>, IProcessor
where TProcessorDescriptor : ProcessorDescriptorBase<TProcessorDescriptor, TProcessorInterface>, TProcessorInterface
Expand All @@ -27,12 +62,26 @@ public abstract class ProcessorDescriptorBase<TProcessorDescriptor, TProcessorIn
protected abstract string Name { get; }
string IProcessor.Name => Name;
IEnumerable<IProcessor> IProcessor.OnFailure { get; set; }
string IProcessor.If { get; set; }
string IProcessor.Tag { get; set; }
bool? IProcessor.IgnoreFailure { get; set; }

/// <inheritdoc />
/// <inheritdoc cref="IProcessor.OnFailure"/>
public TProcessorDescriptor OnFailure(IEnumerable<IProcessor> processors) => Assign(a => a.OnFailure = processors.ToListOrNullIfEmpty());

/// <inheritdoc />
/// <inheritdoc cref="IProcessor.OnFailure"/>
public TProcessorDescriptor OnFailure(Func<ProcessorsDescriptor, IPromise<IList<IProcessor>>> selector) =>
Assign(a => a.OnFailure = selector?.Invoke(new ProcessorsDescriptor())?.Value);

/// <inheritdoc cref="IProcessor.If"/>
public TProcessorDescriptor If(string painlessPredicate) => Assign(a => a.If = painlessPredicate);

/// <inheritdoc cref="IProcessor.Tag"/>
public TProcessorDescriptor Tag(string tag) => Assign(a => a.Tag = tag);

/// <inheritdoc cref="IProcessor.IgnoreFailure"/>
public TProcessorDescriptor IgnoreFailure(bool? ignoreFailure = true) => Assign(a => a.IgnoreFailure = ignoreFailure);

}

}
86 changes: 86 additions & 0 deletions src/Nest/Ingest/Processors/DissectProcessor.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
using System;
using System.Collections.Generic;
using System.Linq.Expressions;
using Newtonsoft.Json;

namespace Nest
{
/// <summary>
/// Similar to the Grok Processor, dissect also extracts structured fields out of a single text field
/// within a document. However unlike the Grok Processor, dissect does not use Regular Expressions.
/// This allows dissect’s syntax to be simple and, for some cases faster, than the Grok Processor.
/// </summary>
[JsonObject(MemberSerialization.OptIn)]
[JsonConverter(typeof(ProcessorJsonConverter<DissectProcessor>))]
public interface IDissectProcessor : IProcessor
{
/// <summary> The field to dissect </summary>
[JsonProperty("field")]
Field Field { get; set; }

/// <summary> The pattern to apply to the field </summary>
[JsonProperty("pattern")]
string Pattern { get; set; }

/// <summary
/// If true and field does not exist or is null, the processor quietly exits without modifying the document>
/// </summary>
[JsonProperty("ignore_missing")]
bool? IgnoreMissing { get; set; }

/// <summary> The character(s) that separate the appended fields. </summary>
[JsonProperty("append_separator")]
string AppendSeparator { get; set; }
}

/// <inheritdoc cref="IDissectProcessor"/>
public class DissectProcessor : ProcessorBase, IDissectProcessor
{
protected override string Name => "dissect";

/// <inheritdoc cref="IDissectProcessor.Field">
public Field Field { get; set; }

/// <inheritdoc cref="IDissectProcessor.Pattern">
public string Pattern { get; set; }

/// <inheritdoc cref="IDissectProcessor.IgnoreMissing">
public bool? IgnoreMissing { get; set; }

/// <inheritdoc cref="IDissectProcessor.AppendSeparator">
public string AppendSeparator { get; set; }
}

/// <inheritdoc cref="IDissectProcessor"/>
public class DissectProcessorDescriptor<T>
: ProcessorDescriptorBase<DissectProcessorDescriptor<T>, IDissectProcessor>, IDissectProcessor
where T : class
{
protected override string Name => "dissect";

Field IDissectProcessor.Field { get; set; }
string IDissectProcessor.Pattern { get; set; }
bool? IDissectProcessor.IgnoreMissing { get; set; }
string IDissectProcessor.AppendSeparator { get; set; }

/// <inheritdoc cref="IDissectProcessor.Field">
public DissectProcessorDescriptor<T> Field(Field field) => Assign(a => a.Field = field);

/// <inheritdoc cref="IDissectProcessor.Field">
public DissectProcessorDescriptor<T> Field(Expression<Func<T, object>> objectPath) =>
Assign(a => a.Field = objectPath);

/// <inheritdoc cref="IDissectProcessor.Pattern">
public DissectProcessorDescriptor<T> Pattern(string pattern) =>
Assign(a => a.Pattern = pattern);

/// <inheritdoc cref="IDissectProcessor.IgnoreMissing">
public DissectProcessorDescriptor<T> IgnoreMissing(bool? traceMatch = true) =>
Assign(a => a.IgnoreMissing = traceMatch);

/// <inheritdoc cref="IDissectProcessor.AppendSeparator">
public DissectProcessorDescriptor<T> AppendSeparator(string appendSeparator) =>
Assign(a => a.AppendSeparator = appendSeparator);

}
}
27 changes: 27 additions & 0 deletions src/Nest/Ingest/Processors/DropProcessor.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
using System;
using System.Linq.Expressions;
using Newtonsoft.Json;

namespace Nest
{
/// <summary>
/// Drops the document without raising any errors. This is useful to prevent the document from getting indexed based on some condition.
/// </summary>
[JsonObject(MemberSerialization.OptIn)]
[JsonConverter(typeof(ProcessorJsonConverter<DropProcessor>))]
public interface IDropProcessor : IProcessor
{
}

/// <inheritdoc cref="IDropProcessor"/>
public class DropProcessor : ProcessorBase, IDropProcessor
{
protected override string Name => "drop";
}

/// <inheritdoc cref="IDropProcessor"/>
public class DropProcessorDescriptor : ProcessorDescriptorBase<DropProcessorDescriptor, IDropProcessor>, IDropProcessor
{
protected override string Name => "drop";
}
}
39 changes: 39 additions & 0 deletions src/Nest/Ingest/Processors/PipelineProcessor.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
using System;
using System.Linq.Expressions;
using Newtonsoft.Json;

namespace Nest
{
/// <summary> Executes another pipeline.</summary>
[JsonObject(MemberSerialization.OptIn)]
[JsonConverter(typeof(ProcessorJsonConverter<PipelineProcessor>))]
public interface IPipelineProcessor : IProcessor
{
//TODO 7.x: this property clashes with the Name property on the IProcessor, need to rename base in master
/// <summary>The name of the pipeline to execute. </summary>
[JsonProperty("name")]
string ProcessorName { get; set; }
}

/// <inheritdoc cref="IPipelineProcessor" />
public class PipelineProcessor : ProcessorBase, IPipelineProcessor
{
/// <inheritdoc cref="IPipelineProcessor.ProcessorName"/>
[JsonProperty("name")]
public string ProcessorName { get; set; }

internal const string ProcessorTypeName = "pipeline";
protected override string Name => ProcessorTypeName;
}

/// <inheritdoc cref="IPipelineProcessor" />
public class PipelineProcessorDescriptor
: ProcessorDescriptorBase<PipelineProcessorDescriptor, IPipelineProcessor>, IPipelineProcessor
{
protected override string Name => PipelineProcessor.ProcessorTypeName;
string IPipelineProcessor.ProcessorName { get; set; }

/// <inheritdoc cref="IPipelineProcessor.ProcessorName"/>
public PipelineProcessorDescriptor ProcessorName(string processorName) => Assign(a => a.ProcessorName = processorName);
}
}
44 changes: 44 additions & 0 deletions src/Nest/Ingest/Processors/SetSecurityUserProcessor .cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
using System;
using System.Linq.Expressions;
using Newtonsoft.Json;

namespace Nest
{
/// <summary>
/// Sets user-related details (such as username, roles, email, full_name and metadata ) from the
/// current authenticated user to the current document by pre-processing the ingest.
/// </summary>
[JsonObject(MemberSerialization.OptIn)]
[JsonConverter(typeof(ProcessorJsonConverter<SetSecurityUserProcessor>))]
public interface ISetSecurityUserProcessor : IProcessor
{
/// <summary>The field to store the user information into. </summary>
[JsonProperty("field")]
Field Field { get; set; }
}


/// <inheritdoc cref="ISetSecurityUserProcessor" />
public class SetSecurityUserProcessor : ProcessorBase, ISetSecurityUserProcessor
{
/// <inheritdoc cref="ISetSecurityUserProcessor.Field"/>
public Field Field { get; set; }
protected override string Name => "set_security_user";
}

/// <inheritdoc cref="ISetSecurityUserProcessor" />
public class SetSecurityUserProcessorDescriptor<T>
: ProcessorDescriptorBase<SetSecurityUserProcessorDescriptor<T>, ISetSecurityUserProcessor>, ISetSecurityUserProcessor
where T : class
{
protected override string Name => "set_security_user";
Field ISetSecurityUserProcessor.Field { get; set; }

/// <inheritdoc cref="ISetSecurityUserProcessor.Field"/>
public SetSecurityUserProcessorDescriptor<T> Field(Field field) => Assign(a => a.Field = field);

/// <inheritdoc cref="ISetSecurityUserProcessor.Field"/>
public SetSecurityUserProcessorDescriptor<T> Field(Expression<Func<T, object>> objectPath) =>
Assign(a => a.Field = objectPath);
}
}
17 changes: 17 additions & 0 deletions src/Nest/Ingest/ProcessorsDescriptor.cs
Original file line number Diff line number Diff line change
Expand Up @@ -141,5 +141,22 @@ public ProcessorsDescriptor UrlDecode<T>(Func<UrlDecodeProcessorDescriptor<T>, I
/// <inheritdoc cref="IBytesProcessor" />
public ProcessorsDescriptor Bytes<T>(Func<BytesProcessorDescriptor<T>, IBytesProcessor> selector) where T : class =>
Assign(a => a.AddIfNotNull(selector?.Invoke(new BytesProcessorDescriptor<T>())));

/// <inheritdoc cref="IDissectProcessor" />
public ProcessorsDescriptor Dissect<T>(Func<DissectProcessorDescriptor<T>, IDissectProcessor> selector) where T : class =>
Assign(a => a.AddIfNotNull(selector?.Invoke(new DissectProcessorDescriptor<T>())));

/// <inheritdoc cref="IDropProcessor" />
public ProcessorsDescriptor Drop(Func<DropProcessorDescriptor, IDropProcessor> selector) =>
Assign(a => a.AddIfNotNull(selector?.Invoke(new DropProcessorDescriptor())));

/// <inheritdoc cref="ISetSecurityUserProcessor" />
public ProcessorsDescriptor SetSecurityUser<T>(Func<SetSecurityUserProcessorDescriptor<T>, ISetSecurityUserProcessor> selector) where T : class =>
Assign(a => a.AddIfNotNull(selector?.Invoke(new SetSecurityUserProcessorDescriptor<T>())));

/// <inheritdoc cref="IPipelineProcessor" />
public ProcessorsDescriptor Pipeline(Func<PipelineProcessorDescriptor, IPipelineProcessor> selector) =>
Assign(a => a.AddIfNotNull(selector?.Invoke(new PipelineProcessorDescriptor())));

}
}
Loading