Skip to content

Add Csv processor #4374

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Feb 10, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions src/Nest/Ingest/ProcessorFormatter.cs
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ internal class ProcessorFormatter : IJsonFormatter<IProcessor>
{ "drop", 29 },
{ "circle", 30 },
{ "enrich", 31 },
{ "csv", 32 },
};

public IProcessor Deserialize(ref JsonReader reader, IJsonFormatterResolver formatterResolver)
Expand Down Expand Up @@ -157,6 +158,9 @@ public IProcessor Deserialize(ref JsonReader reader, IJsonFormatterResolver form
case 31:
processor = Deserialize<EnrichProcessor>(ref reader, formatterResolver);
break;
case 32:
processor = Deserialize<CsvProcessor>(ref reader, formatterResolver);
break;
}
}
else
Expand Down Expand Up @@ -185,6 +189,9 @@ public void Serialize(ref JsonWriter writer, IProcessor value, IJsonFormatterRes
case "append":
Serialize<IAppendProcessor>(ref writer, value, formatterResolver);
break;
case "csv":
Serialize<ICsvProcessor>(ref writer, value, formatterResolver);
break;
case "convert":
Serialize<IConvertProcessor>(ref writer, value, formatterResolver);
break;
Expand Down
114 changes: 114 additions & 0 deletions src/Nest/Ingest/Processors/CsvProcessor.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
using System;
using System.Linq.Expressions;
using System.Runtime.Serialization;
using Elasticsearch.Net;
using Elasticsearch.Net.Utf8Json;


namespace Nest
{
/// <summary>
/// Extracts fields from CSV line out of a single text field within a document.
/// Any empty field in CSV will be skipped.
/// <para></para>
/// Available in Elasticsearch 7.6.0+
/// </summary>
[InterfaceDataContract]
public interface ICsvProcessor : IProcessor
{
/// <summary>
/// The field to extract data from
/// </summary>
[DataMember(Name ="field")]
Field Field { get; set; }

/// <summary>
/// The array of fields to assign extracted values to.
/// </summary>
[DataMember(Name ="target_fields")]
Fields TargetFields { get; set; }

/// <summary>
/// Separator used in CSV, has to be single character string. Defaults to <c>,</c>
/// </summary>
[DataMember(Name = "separator")]
string Separator { get; set; }

/// <summary>
/// Quote used in CSV, has to be single character string. Defaults to <c>"</c>
/// </summary>
[DataMember(Name = "quote")]
string Quote { get; set; }

/// <summary>
/// If <c>true</c> and <see cref="Field" /> does not exist or is null,
/// the processor quietly exits without modifying the document. Default is <c>false</c>
/// </summary>
[DataMember(Name = "ignore_missing")]
bool? IgnoreMissing { get; set; }

/// <summary>
/// Trim whitespaces in unquoted fields. Default is <c>false</c>;
/// </summary>
[DataMember(Name = "trim")]
bool? Trim { get; set; }
}

/// <inheritdoc cref="ICsvProcessor"/>
public class CsvProcessor : ProcessorBase, ICsvProcessor
{
/// <inheritdoc />
public Field Field { get; set; }
/// <inheritdoc />
public Fields TargetFields { get; set; }
/// <inheritdoc />
public string Separator { get; set; }
/// <inheritdoc />
public string Quote { get; set; }
/// <inheritdoc />
public bool? IgnoreMissing { get; set; }
/// <inheritdoc />
public bool? Trim { get; set; }
/// <inheritdoc />
protected override string Name => "csv";
}

/// <inheritdoc cref="ICsvProcessor"/>
public class CsvProcessorDescriptor<T> : ProcessorDescriptorBase<CsvProcessorDescriptor<T>, ICsvProcessor>, ICsvProcessor
where T : class
{
protected override string Name => "csv";
Field ICsvProcessor.Field { get; set; }
Fields ICsvProcessor.TargetFields { get; set; }
bool? ICsvProcessor.IgnoreMissing { get; set; }
string ICsvProcessor.Quote { get; set; }
string ICsvProcessor.Separator { get; set; }
bool? ICsvProcessor.Trim { get; set; }

/// <inheritdoc cref="ICsvProcessor.Field" />
public CsvProcessorDescriptor<T> Field(Field field) => Assign(field, (a, v) => a.Field = v);

/// <inheritdoc cref="ICsvProcessor.Field" />
public CsvProcessorDescriptor<T> Field<TValue>(Expression<Func<T, TValue>> objectPath) =>
Assign(objectPath, (a, v) => a.Field = v);

/// <inheritdoc cref="ICsvProcessor.TargetFields" />
public CsvProcessorDescriptor<T> TargetFields(Func<FieldsDescriptor<T>, IPromise<Fields>> targetFields) =>
Assign(targetFields, (a, v) => a.TargetFields = v?.Invoke(new FieldsDescriptor<T>())?.Value);

/// <inheritdoc cref="ICsvProcessor.TargetFields" />
public CsvProcessorDescriptor<T> TargetFields(Fields targetFields) => Assign(targetFields, (a, v) => a.TargetFields = v);

/// <inheritdoc cref="ICsvProcessor.IgnoreMissing" />
public CsvProcessorDescriptor<T> IgnoreMissing(bool? ignoreMissing = true) => Assign(ignoreMissing, (a, v) => a.IgnoreMissing = v);

/// <inheritdoc cref="ICsvProcessor.Trim" />
public CsvProcessorDescriptor<T> Trim(bool? trim = true) => Assign(trim, (a, v) => a.Trim = v);

/// <inheritdoc cref="ICsvProcessor.Quote" />
public CsvProcessorDescriptor<T> Quote(string quote) => Assign(quote, (a, v) => a.Quote = v);

/// <inheritdoc cref="ICsvProcessor.Separator" />
public CsvProcessorDescriptor<T> Separator(string separator) => Assign(separator, (a, v) => a.Separator = v);
}
}
5 changes: 5 additions & 0 deletions src/Nest/Ingest/ProcessorsDescriptor.cs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,11 @@ public ProcessorsDescriptor Attachment<T>(Func<AttachmentProcessorDescriptor<T>,
public ProcessorsDescriptor Append<T>(Func<AppendProcessorDescriptor<T>, IAppendProcessor> selector) where T : class =>
Assign(selector, (a, v) => a.AddIfNotNull(v?.Invoke(new AppendProcessorDescriptor<T>())));

/// <inheritdoc cref="ICsvProcessor"/>
public ProcessorsDescriptor Csv<T>(Func<CsvProcessorDescriptor<T>, ICsvProcessor> selector) where T : class =>
Assign(selector, (a, v) => a.AddIfNotNull(v?.Invoke(new CsvProcessorDescriptor<T>())));


/// <inheritdoc cref="IConvertProcessor"/>
public ProcessorsDescriptor Convert<T>(Func<ConvertProcessorDescriptor<T>, IConvertProcessor> selector) where T : class =>
Assign(selector, (a, v) => a.AddIfNotNull(v?.Invoke(new ConvertProcessorDescriptor<T>())));
Expand Down
24 changes: 24 additions & 0 deletions tests/Tests/Ingest/ProcessorAssertions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,30 @@ public class Append : ProcessorAssertion
public override string Key => "append";
}

[SkipVersion("<7.6.0", "Introduced in Elasticsearch 7.6.0+")]
public class Csv : ProcessorAssertion
{
public override Func<ProcessorsDescriptor, IPromise<IList<IProcessor>>> Fluent => d => d
.Csv<Project>(c => c
.Field(p => p.Name)
.TargetFields(new[] { "targetField1", "targetField2" })
);

public override IProcessor Initializer => new CsvProcessor
{
Field = "name",
TargetFields = new[] { "targetField1", "targetField2" },
};

public override object Json => new
{
field = "name",
target_fields = new[] { "targetField1", "targetField2" },
};

public override string Key => "csv";
}

public class Convert : ProcessorAssertion
{
public override Func<ProcessorsDescriptor, IPromise<IList<IProcessor>>> Fluent => d => d
Expand Down