Skip to content

Commit 5560135

Browse files
Add empty_value parameter to CSV processor (#51567)
* Add empty_value parameter to CSV processor This change adds `empty_value` parameter to the CSV processor. This value is used to fill empty fields. Fields will be skipped if this parameter is ommited. This behavior is the same for both quoted and unquoted fields. * docs updated * Fix compilation problem Co-authored-by: Elastic Machine <[email protected]>
1 parent 14900c2 commit 5560135

File tree

4 files changed

+108
-29
lines changed

4 files changed

+108
-29
lines changed

docs/reference/ingest/processors/csv.asciidoc

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,8 @@ Extracts fields from CSV line out of a single text field within a document. Any
1313
| `quote` | no | " | Quote used in CSV, has to be single character string
1414
| `ignore_missing` | no | `true` | If `true` and `field` does not exist, the processor quietly exits without modifying the document
1515
| `trim` | no | `false` | Trim whitespaces in unquoted fields
16+
| `empty_value` | no | - | Value used to fill empty fields, empty fields will be skipped if this is not provided.
17+
Empty field is one with no value (2 consecutive separators) or empty quotes (`""`)
1618
include::common-options.asciidoc[]
1719
|======
1820

modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/CsvParser.java

Lines changed: 13 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@ private enum State {
3636
private final char separator;
3737
private final boolean trim;
3838
private final String[] headers;
39+
private final Object emptyValue;
3940
private final IngestDocument ingestDocument;
4041
private final StringBuilder builder = new StringBuilder();
4142
private State state = State.START;
@@ -45,12 +46,13 @@ private enum State {
4546
private int length;
4647
private int currentIndex;
4748

48-
CsvParser(IngestDocument ingestDocument, char quote, char separator, boolean trim, String[] headers) {
49+
CsvParser(IngestDocument ingestDocument, char quote, char separator, boolean trim, String[] headers, Object emptyValue) {
4950
this.ingestDocument = ingestDocument;
5051
this.quote = quote;
5152
this.separator = separator;
5253
this.trim = trim;
5354
this.headers = headers;
55+
this.emptyValue = emptyValue;
5456
}
5557

5658
void process(String line) {
@@ -102,7 +104,8 @@ private boolean processStart() {
102104
return false;
103105
} else if (c == separator) {
104106
startIndex++;
105-
if (nextHeader()) {
107+
builder.setLength(0);
108+
if (setField(startIndex)) {
106109
return true;
107110
}
108111
} else if (isWhitespace(c)) {
@@ -190,16 +193,17 @@ private boolean isWhitespace(char c) {
190193
}
191194

192195
private boolean setField(int endIndex) {
196+
String value;
193197
if (builder.length() == 0) {
194-
ingestDocument.setFieldValue(headers[currentHeader], line.substring(startIndex, endIndex));
198+
value = line.substring(startIndex, endIndex);
195199
} else {
196-
builder.append(line, startIndex, endIndex);
197-
ingestDocument.setFieldValue(headers[currentHeader], builder.toString());
200+
value = builder.append(line, startIndex, endIndex).toString();
201+
}
202+
if (value.length() > 0) {
203+
ingestDocument.setFieldValue(headers[currentHeader], value);
204+
} else if (emptyValue != null) {
205+
ingestDocument.setFieldValue(headers[currentHeader], emptyValue);
198206
}
199-
return nextHeader();
200-
}
201-
202-
private boolean nextHeader() {
203207
currentHeader++;
204208
return currentHeader == headers.length;
205209
}

modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/CsvProcessor.java

Lines changed: 13 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -32,13 +32,14 @@
3232
* A processor that breaks line from CSV file into separate fields.
3333
* If there's more fields requested than there is in the CSV, extra field will not be present in the document after processing.
3434
* In the same way this processor will skip any field that is empty in CSV.
35-
*
35+
* <p>
3636
* By default it uses rules according to <a href="https://tools.ietf.org/html/rfc4180">RCF 4180</a> with one exception: whitespaces are
3737
* allowed before or after quoted field. Processor can be tweaked with following parameters:
38-
*
38+
* <p>
3939
* quote: set custom quote character (defaults to ")
4040
* separator: set custom separator (defaults to ,)
4141
* trim: trim leading and trailing whitespaces in unquoted fields
42+
* empty_value: sets custom value to use for empty fields (field is skipped if null)
4243
*/
4344
public final class CsvProcessor extends AbstractProcessor {
4445

@@ -50,15 +51,18 @@ public final class CsvProcessor extends AbstractProcessor {
5051
private final char quote;
5152
private final char separator;
5253
private final boolean ignoreMissing;
54+
private final Object emptyValue;
5355

54-
CsvProcessor(String tag, String field, String[] headers, boolean trim, char separator, char quote, boolean ignoreMissing) {
56+
CsvProcessor(String tag, String field, String[] headers, boolean trim, char separator, char quote, boolean ignoreMissing,
57+
Object emptyValue) {
5558
super(tag);
5659
this.field = field;
5760
this.headers = headers;
5861
this.trim = trim;
5962
this.quote = quote;
6063
this.separator = separator;
6164
this.ignoreMissing = ignoreMissing;
65+
this.emptyValue = emptyValue;
6266
}
6367

6468
@Override
@@ -73,7 +77,7 @@ public IngestDocument execute(IngestDocument ingestDocument) {
7377
} else if (line == null) {
7478
throw new IllegalArgumentException("field [" + field + "] is null, cannot process it.");
7579
}
76-
new CsvParser(ingestDocument, quote, separator, trim, headers).process(line);
80+
new CsvParser(ingestDocument, quote, separator, trim, headers, emptyValue).process(line);
7781
return ingestDocument;
7882
}
7983

@@ -96,13 +100,17 @@ public CsvProcessor create(Map<String, org.elasticsearch.ingest.Processor.Factor
96100
throw newConfigurationException(TYPE, processorTag, "separator", "separator has to be single character like , or ;");
97101
}
98102
boolean trim = ConfigurationUtils.readBooleanProperty(TYPE, processorTag, config, "trim", false);
103+
Object emptyValue = null;
104+
if(config.containsKey("emptyValue")){
105+
emptyValue = ConfigurationUtils.readObject(TYPE, processorTag, config, "empty_value");
106+
}
99107
boolean ignoreMissing = ConfigurationUtils.readBooleanProperty(TYPE, processorTag, config, "ignore_missing", false);
100108
List<String> targetFields = ConfigurationUtils.readList(TYPE, processorTag, config, "target_fields");
101109
if (targetFields.isEmpty()) {
102110
throw newConfigurationException(TYPE, processorTag, "target_fields", "target fields list can't be empty");
103111
}
104112
return new CsvProcessor(processorTag, field, targetFields.toArray(String[]::new), trim, separator.charAt(0), quote.charAt(0),
105-
ignoreMissing);
113+
ignoreMissing, emptyValue);
106114
}
107115
}
108116
}

modules/ingest-common/src/test/java/org/elasticsearch/ingest/common/CsvProcessorTests.java

Lines changed: 80 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,7 @@ public void setup() {
5353
separator = randomFrom(SEPARATORS);
5454
}
5555

56-
public void testExactNumberOfFields() throws Exception {
56+
public void testExactNumberOfFields() {
5757
int numItems = randomIntBetween(2, 10);
5858
Map<String, String> items = new LinkedHashMap<>();
5959
for (int i = 0; i < numItems; i++) {
@@ -67,7 +67,67 @@ public void testExactNumberOfFields() throws Exception {
6767
items.forEach((key, value) -> assertEquals(value, ingestDocument.getFieldValue(key, String.class)));
6868
}
6969

70-
public void testLessFieldsThanHeaders() throws Exception {
70+
public void testEmptyValues() {
71+
int numItems = randomIntBetween(5, 10);
72+
Map<String, String> items = new LinkedHashMap<>();
73+
for (int i = 0; i < 3; i++) {
74+
items.put(randomAlphaOfLengthBetween(5, 10), randomAlphaOfLengthBetween(5, 10));
75+
}
76+
String emptyKey = randomAlphaOfLengthBetween(5, 10);
77+
items.put(emptyKey, "");
78+
for (int i = 0; i < numItems - 4; i++) {
79+
items.put(randomAlphaOfLengthBetween(5, 10), randomAlphaOfLengthBetween(5, 10));
80+
}
81+
String[] headers = items.keySet().toArray(new String[numItems]);
82+
String csv = items.values().stream().map(v -> quote + v + quote).collect(Collectors.joining(separator + ""));
83+
84+
IngestDocument ingestDocument = processDocument(headers, csv);
85+
86+
items.forEach((key, value) -> {
87+
if (emptyKey.equals(key)) {
88+
assertFalse(ingestDocument.hasField(key));
89+
} else {
90+
assertEquals(value, ingestDocument.getFieldValue(key, String.class));
91+
}
92+
});
93+
}
94+
95+
public void testEmptyValuesReplace() {
96+
int numItems = randomIntBetween(5, 10);
97+
Map<String, String> items = new LinkedHashMap<>();
98+
for (int i = 0; i < 3; i++) {
99+
items.put(randomAlphaOfLengthBetween(5, 10), randomAlphaOfLengthBetween(5, 10));
100+
}
101+
String emptyKey = randomAlphaOfLengthBetween(5, 10);
102+
items.put(emptyKey, "");
103+
for (int i = 0; i < numItems - 4; i++) {
104+
items.put(randomAlphaOfLengthBetween(5, 10), randomAlphaOfLengthBetween(5, 10));
105+
}
106+
String[] headers = items.keySet().toArray(new String[numItems]);
107+
String csv = items.values().stream().map(v -> quote + v + quote).collect(Collectors.joining(separator + ""));
108+
109+
IngestDocument ingestDocument = processDocument(headers, csv, true, "");
110+
111+
items.forEach((key, value) -> {
112+
if (emptyKey.equals(key)) {
113+
assertEquals("", ingestDocument.getFieldValue(key, String.class));
114+
} else {
115+
assertEquals(value, ingestDocument.getFieldValue(key, String.class));
116+
}
117+
});
118+
119+
IngestDocument ingestDocument2 = processDocument(headers, csv, true, 0);
120+
121+
items.forEach((key, value) -> {
122+
if (emptyKey.equals(key)) {
123+
assertEquals(0, (int) ingestDocument2.getFieldValue(key, Integer.class));
124+
} else {
125+
assertEquals(value, ingestDocument2.getFieldValue(key, String.class));
126+
}
127+
});
128+
}
129+
130+
public void testLessFieldsThanHeaders() {
71131
int numItems = randomIntBetween(4, 10);
72132
Map<String, String> items = new LinkedHashMap<>();
73133
for (int i = 0; i < numItems; i++) {
@@ -82,7 +142,7 @@ public void testLessFieldsThanHeaders() throws Exception {
82142
items.entrySet().stream().limit(3).forEach(e -> assertEquals(e.getValue(), ingestDocument.getFieldValue(e.getKey(), String.class)));
83143
}
84144

85-
public void testLessHeadersThanFields() throws Exception {
145+
public void testLessHeadersThanFields() {
86146
int numItems = randomIntBetween(5, 10);
87147
Map<String, String> items = new LinkedHashMap<>();
88148
for (int i = 0; i < numItems; i++) {
@@ -96,7 +156,7 @@ public void testLessHeadersThanFields() throws Exception {
96156
items.entrySet().stream().limit(3).forEach(e -> assertEquals(e.getValue(), ingestDocument.getFieldValue(e.getKey(), String.class)));
97157
}
98158

99-
public void testSingleField() throws Exception {
159+
public void testSingleField() {
100160
String[] headers = new String[]{randomAlphaOfLengthBetween(5, 10)};
101161
String value = randomAlphaOfLengthBetween(5, 10);
102162
String csv = quote + value + quote;
@@ -106,7 +166,7 @@ public void testSingleField() throws Exception {
106166
assertEquals(value, ingestDocument.getFieldValue(headers[0], String.class));
107167
}
108168

109-
public void testEscapedQuote() throws Exception {
169+
public void testEscapedQuote() {
110170
int numItems = randomIntBetween(2, 10);
111171
Map<String, String> items = new LinkedHashMap<>();
112172
for (int i = 0; i < numItems; i++) {
@@ -121,7 +181,7 @@ public void testEscapedQuote() throws Exception {
121181
items.forEach((key, value) -> assertEquals(value.replace(quote + quote, quote), ingestDocument.getFieldValue(key, String.class)));
122182
}
123183

124-
public void testQuotedStrings() throws Exception {
184+
public void testQuotedStrings() {
125185
assumeFalse("quote needed", quote.isEmpty());
126186
int numItems = randomIntBetween(2, 10);
127187
Map<String, String> items = new LinkedHashMap<>();
@@ -138,7 +198,7 @@ public void testQuotedStrings() throws Exception {
138198
String.class)));
139199
}
140200

141-
public void testEmptyFields() throws Exception {
201+
public void testEmptyFields() {
142202
int numItems = randomIntBetween(5, 10);
143203
Map<String, String> items = new LinkedHashMap<>();
144204
for (int i = 0; i < numItems; i++) {
@@ -167,7 +227,7 @@ public void testWrongStrings() throws Exception {
167227
expectThrows(IllegalArgumentException.class, () -> processDocument(new String[]{"a"}, "abc\rabc"));
168228
}
169229

170-
public void testQuotedWhitespaces() throws Exception {
230+
public void testQuotedWhitespaces() {
171231
assumeFalse("quote needed", quote.isEmpty());
172232
IngestDocument document = processDocument(new String[]{"a", "b", "c", "d"},
173233
" abc " + separator + " def" + separator + "ghi " + separator + " " + quote + " ooo " + quote);
@@ -177,7 +237,7 @@ public void testQuotedWhitespaces() throws Exception {
177237
assertEquals(" ooo ", document.getFieldValue("d", String.class));
178238
}
179239

180-
public void testUntrimmed() throws Exception {
240+
public void testUntrimmed() {
181241
assumeFalse("quote needed", quote.isEmpty());
182242
IngestDocument document = processDocument(new String[]{"a", "b", "c", "d", "e", "f"},
183243
" abc " + separator + " def" + separator + "ghi " + separator + " "
@@ -197,9 +257,9 @@ public void testIgnoreMissing() {
197257
if (ingestDocument.hasField(fieldName)) {
198258
ingestDocument.removeField(fieldName);
199259
}
200-
CsvProcessor processor = new CsvProcessor(randomAlphaOfLength(5), fieldName, new String[]{"a"}, false, ',', '"', true);
260+
CsvProcessor processor = new CsvProcessor(randomAlphaOfLength(5), fieldName, new String[]{"a"}, false, ',', '"', true, null);
201261
processor.execute(ingestDocument);
202-
CsvProcessor processor2 = new CsvProcessor(randomAlphaOfLength(5), fieldName, new String[]{"a"}, false, ',', '"', false);
262+
CsvProcessor processor2 = new CsvProcessor(randomAlphaOfLength(5), fieldName, new String[]{"a"}, false, ',', '"', false, null);
203263
expectThrows(IllegalArgumentException.class, () -> processor2.execute(ingestDocument));
204264
}
205265

@@ -209,24 +269,29 @@ public void testEmptyHeaders() throws Exception {
209269
String fieldName = RandomDocumentPicks.addRandomField(random(), ingestDocument, "abc,abc");
210270
HashMap<String, Object> metadata = new HashMap<>(ingestDocument.getSourceAndMetadata());
211271

212-
CsvProcessor processor = new CsvProcessor(randomAlphaOfLength(5), fieldName, new String[0], false, ',', '"', false);
272+
CsvProcessor processor = new CsvProcessor(randomAlphaOfLength(5), fieldName, new String[0], false, ',', '"', false, null);
213273

214274
processor.execute(ingestDocument);
215275

216276
assertEquals(metadata, ingestDocument.getSourceAndMetadata());
217277
}
218278

219-
private IngestDocument processDocument(String[] headers, String csv) throws Exception {
279+
private IngestDocument processDocument(String[] headers, String csv) {
220280
return processDocument(headers, csv, true);
221281
}
222282

223-
private IngestDocument processDocument(String[] headers, String csv, boolean trim) throws Exception {
283+
private IngestDocument processDocument(String[] headers, String csv, boolean trim) {
284+
return processDocument(headers, csv, trim, null);
285+
}
286+
287+
private IngestDocument processDocument(String[] headers, String csv, boolean trim, Object emptyValue) {
224288
IngestDocument ingestDocument = RandomDocumentPicks.randomIngestDocument(random());
225289
Arrays.stream(headers).filter(ingestDocument::hasField).forEach(ingestDocument::removeField);
226290

227291
String fieldName = RandomDocumentPicks.addRandomField(random(), ingestDocument, csv);
228292
char quoteChar = quote.isEmpty() ? '"' : quote.charAt(0);
229-
CsvProcessor processor = new CsvProcessor(randomAlphaOfLength(5), fieldName, headers, trim, separator, quoteChar, false);
293+
CsvProcessor processor = new CsvProcessor(randomAlphaOfLength(5), fieldName, headers, trim, separator, quoteChar, false,
294+
emptyValue);
230295

231296
processor.execute(ingestDocument);
232297

0 commit comments

Comments
 (0)