Skip to content

Commit f91ccda

Browse files
authored
Add Csv processor (#4374)
Relates: #4341 This commit adds the Csv processor to the collection of ingest processors.
1 parent 9b024e2 commit f91ccda

File tree

4 files changed

+150
-0
lines changed

4 files changed

+150
-0
lines changed

src/Nest/Ingest/ProcessorFormatter.cs

+7
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@ internal class ProcessorFormatter : IJsonFormatter<IProcessor>
4141
{ "drop", 29 },
4242
{ "circle", 30 },
4343
{ "enrich", 31 },
44+
{ "csv", 32 },
4445
};
4546

4647
public IProcessor Deserialize(ref JsonReader reader, IJsonFormatterResolver formatterResolver)
@@ -157,6 +158,9 @@ public IProcessor Deserialize(ref JsonReader reader, IJsonFormatterResolver form
157158
case 31:
158159
processor = Deserialize<EnrichProcessor>(ref reader, formatterResolver);
159160
break;
161+
case 32:
162+
processor = Deserialize<CsvProcessor>(ref reader, formatterResolver);
163+
break;
160164
}
161165
}
162166
else
@@ -185,6 +189,9 @@ public void Serialize(ref JsonWriter writer, IProcessor value, IJsonFormatterRes
185189
case "append":
186190
Serialize<IAppendProcessor>(ref writer, value, formatterResolver);
187191
break;
192+
case "csv":
193+
Serialize<ICsvProcessor>(ref writer, value, formatterResolver);
194+
break;
188195
case "convert":
189196
Serialize<IConvertProcessor>(ref writer, value, formatterResolver);
190197
break;
+114
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,114 @@
1+
using System;
2+
using System.Linq.Expressions;
3+
using System.Runtime.Serialization;
4+
using Elasticsearch.Net;
5+
using Elasticsearch.Net.Utf8Json;
6+
7+
8+
namespace Nest
9+
{
10+
/// <summary>
11+
/// Extracts fields from CSV line out of a single text field within a document.
12+
/// Any empty field in CSV will be skipped.
13+
/// <para></para>
14+
/// Available in Elasticsearch 7.6.0+
15+
/// </summary>
16+
[InterfaceDataContract]
17+
public interface ICsvProcessor : IProcessor
18+
{
19+
/// <summary>
20+
/// The field to extract data from
21+
/// </summary>
22+
[DataMember(Name ="field")]
23+
Field Field { get; set; }
24+
25+
/// <summary>
26+
/// The array of fields to assign extracted values to.
27+
/// </summary>
28+
[DataMember(Name ="target_fields")]
29+
Fields TargetFields { get; set; }
30+
31+
/// <summary>
32+
/// Separator used in CSV, has to be single character string. Defaults to <c>,</c>
33+
/// </summary>
34+
[DataMember(Name = "separator")]
35+
string Separator { get; set; }
36+
37+
/// <summary>
38+
/// Quote used in CSV, has to be single character string. Defaults to <c>"</c>
39+
/// </summary>
40+
[DataMember(Name = "quote")]
41+
string Quote { get; set; }
42+
43+
/// <summary>
44+
/// If <c>true</c> and <see cref="Field" /> does not exist or is null,
45+
/// the processor quietly exits without modifying the document. Default is <c>false</c>
46+
/// </summary>
47+
[DataMember(Name = "ignore_missing")]
48+
bool? IgnoreMissing { get; set; }
49+
50+
/// <summary>
51+
/// Trim whitespaces in unquoted fields. Default is <c>false</c>;
52+
/// </summary>
53+
[DataMember(Name = "trim")]
54+
bool? Trim { get; set; }
55+
}
56+
57+
/// <inheritdoc cref="ICsvProcessor"/>
58+
public class CsvProcessor : ProcessorBase, ICsvProcessor
59+
{
60+
/// <inheritdoc />
61+
public Field Field { get; set; }
62+
/// <inheritdoc />
63+
public Fields TargetFields { get; set; }
64+
/// <inheritdoc />
65+
public string Separator { get; set; }
66+
/// <inheritdoc />
67+
public string Quote { get; set; }
68+
/// <inheritdoc />
69+
public bool? IgnoreMissing { get; set; }
70+
/// <inheritdoc />
71+
public bool? Trim { get; set; }
72+
/// <inheritdoc />
73+
protected override string Name => "csv";
74+
}
75+
76+
/// <inheritdoc cref="ICsvProcessor"/>
77+
public class CsvProcessorDescriptor<T> : ProcessorDescriptorBase<CsvProcessorDescriptor<T>, ICsvProcessor>, ICsvProcessor
78+
where T : class
79+
{
80+
protected override string Name => "csv";
81+
Field ICsvProcessor.Field { get; set; }
82+
Fields ICsvProcessor.TargetFields { get; set; }
83+
bool? ICsvProcessor.IgnoreMissing { get; set; }
84+
string ICsvProcessor.Quote { get; set; }
85+
string ICsvProcessor.Separator { get; set; }
86+
bool? ICsvProcessor.Trim { get; set; }
87+
88+
/// <inheritdoc cref="ICsvProcessor.Field" />
89+
public CsvProcessorDescriptor<T> Field(Field field) => Assign(field, (a, v) => a.Field = v);
90+
91+
/// <inheritdoc cref="ICsvProcessor.Field" />
92+
public CsvProcessorDescriptor<T> Field<TValue>(Expression<Func<T, TValue>> objectPath) =>
93+
Assign(objectPath, (a, v) => a.Field = v);
94+
95+
/// <inheritdoc cref="ICsvProcessor.TargetFields" />
96+
public CsvProcessorDescriptor<T> TargetFields(Func<FieldsDescriptor<T>, IPromise<Fields>> targetFields) =>
97+
Assign(targetFields, (a, v) => a.TargetFields = v?.Invoke(new FieldsDescriptor<T>())?.Value);
98+
99+
/// <inheritdoc cref="ICsvProcessor.TargetFields" />
100+
public CsvProcessorDescriptor<T> TargetFields(Fields targetFields) => Assign(targetFields, (a, v) => a.TargetFields = v);
101+
102+
/// <inheritdoc cref="ICsvProcessor.IgnoreMissing" />
103+
public CsvProcessorDescriptor<T> IgnoreMissing(bool? ignoreMissing = true) => Assign(ignoreMissing, (a, v) => a.IgnoreMissing = v);
104+
105+
/// <inheritdoc cref="ICsvProcessor.Trim" />
106+
public CsvProcessorDescriptor<T> Trim(bool? trim = true) => Assign(trim, (a, v) => a.Trim = v);
107+
108+
/// <inheritdoc cref="ICsvProcessor.Quote" />
109+
public CsvProcessorDescriptor<T> Quote(string quote) => Assign(quote, (a, v) => a.Quote = v);
110+
111+
/// <inheritdoc cref="ICsvProcessor.Separator" />
112+
public CsvProcessorDescriptor<T> Separator(string separator) => Assign(separator, (a, v) => a.Separator = v);
113+
}
114+
}

src/Nest/Ingest/ProcessorsDescriptor.cs

+5
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,11 @@ public ProcessorsDescriptor Attachment<T>(Func<AttachmentProcessorDescriptor<T>,
2222
public ProcessorsDescriptor Append<T>(Func<AppendProcessorDescriptor<T>, IAppendProcessor> selector) where T : class =>
2323
Assign(selector, (a, v) => a.AddIfNotNull(v?.Invoke(new AppendProcessorDescriptor<T>())));
2424

25+
/// <inheritdoc cref="ICsvProcessor"/>
26+
public ProcessorsDescriptor Csv<T>(Func<CsvProcessorDescriptor<T>, ICsvProcessor> selector) where T : class =>
27+
Assign(selector, (a, v) => a.AddIfNotNull(v?.Invoke(new CsvProcessorDescriptor<T>())));
28+
29+
2530
/// <inheritdoc cref="IConvertProcessor"/>
2631
public ProcessorsDescriptor Convert<T>(Func<ConvertProcessorDescriptor<T>, IConvertProcessor> selector) where T : class =>
2732
Assign(selector, (a, v) => a.AddIfNotNull(v?.Invoke(new ConvertProcessorDescriptor<T>())));

tests/Tests/Ingest/ProcessorAssertions.cs

+24
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,30 @@ public class Append : ProcessorAssertion
7070
public override string Key => "append";
7171
}
7272

73+
[SkipVersion("<7.6.0", "Introduced in Elasticsearch 7.6.0+")]
74+
public class Csv : ProcessorAssertion
75+
{
76+
public override Func<ProcessorsDescriptor, IPromise<IList<IProcessor>>> Fluent => d => d
77+
.Csv<Project>(c => c
78+
.Field(p => p.Name)
79+
.TargetFields(new[] { "targetField1", "targetField2" })
80+
);
81+
82+
public override IProcessor Initializer => new CsvProcessor
83+
{
84+
Field = "name",
85+
TargetFields = new[] { "targetField1", "targetField2" },
86+
};
87+
88+
public override object Json => new
89+
{
90+
field = "name",
91+
target_fields = new[] { "targetField1", "targetField2" },
92+
};
93+
94+
public override string Key => "csv";
95+
}
96+
7397
public class Convert : ProcessorAssertion
7498
{
7599
public override Func<ProcessorsDescriptor, IPromise<IList<IProcessor>>> Fluent => d => d

0 commit comments

Comments
 (0)