Skip to content

Commit c0056cd

Browse files
authored
ingest: Introduction of a bytes processor (#31733)
ingest: Introduction of a bytes processor This processor allows for human readable byte values (e.g. 1kb) to be converted to value in bytes (e.g. 1024). Internally this processor re-uses "ByteSizeValue.parseBytesSizeValue" which supports conversions up to Long.MAX_VALUE and the following units: "b", "kb", "mb", "gb", "tb", pb". This change also introduces a generic return type for the AbstractStringProcessor to allow for code reuse while supporting a String -> T conversion. (String -> Long in this case).
1 parent 396c578 commit c0056cd

File tree

9 files changed

+269
-7
lines changed

9 files changed

+269
-7
lines changed

docs/reference/ingest/ingest-node.asciidoc

+27
Original file line numberDiff line numberDiff line change
@@ -773,6 +773,33 @@ Accepts a single value or an array of values.
773773
--------------------------------------------------
774774
// NOTCONSOLE
775775

776+
[[bytes-processor]]
777+
=== Bytes Processor
778+
Converts a human readable byte value (e.g. 1kb) to its value in bytes (e.g. 1024).
779+
780+
Supported human readable units are "b", "kb", "mb", "gb", "tb", "pb" case insensitive. An error will occur if
781+
the field is not a supported format or resultant value exceeds 2^63.
782+
783+
[[bytes-options]]
784+
.Bytes Options
785+
[options="header"]
786+
|======
787+
| Name | Required | Default | Description
788+
| `field` | yes | - | The field to convert
789+
| `target_field` | no | `field` | The field to assign the converted value to, by default `field` is updated in-place
790+
| `ignore_missing` | no | `false` | If `true` and `field` does not exist or is `null`, the processor quietly exits without modifying the document
791+
|======
792+
793+
[source,js]
794+
--------------------------------------------------
795+
{
796+
"bytes": {
797+
"field": "foo"
798+
}
799+
}
800+
--------------------------------------------------
801+
// NOTCONSOLE
802+
776803
[[convert-processor]]
777804
=== Convert Processor
778805
Converts an existing field's value to a different type, such as converting a string to an integer.

modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/AbstractStringProcessor.java

+5-3
Original file line numberDiff line numberDiff line change
@@ -27,10 +27,12 @@
2727
import java.util.Map;
2828

2929
/**
30-
* Base class for processors that manipulate strings and require a single "fields" array config value, which
30+
* Base class for processors that manipulate source strings and require a single "fields" array config value, which
3131
* holds a list of field names in string format.
32+
*
33+
* @param <T> The resultant type for the target field
3234
*/
33-
abstract class AbstractStringProcessor extends AbstractProcessor {
35+
abstract class AbstractStringProcessor<T> extends AbstractProcessor {
3436
private final String field;
3537
private final boolean ignoreMissing;
3638
private final String targetField;
@@ -67,7 +69,7 @@ public final void execute(IngestDocument document) {
6769
document.setFieldValue(targetField, process(val));
6870
}
6971

70-
protected abstract String process(String value);
72+
protected abstract T process(String value);
7173

7274
abstract static class Factory implements Processor.Factory {
7375
final String processorType;
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,60 @@
1+
/*
2+
* Licensed to Elasticsearch under one or more contributor
3+
* license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright
5+
* ownership. Elasticsearch licenses this file to you under
6+
* the Apache License, Version 2.0 (the "License"); you may
7+
* not use this file except in compliance with the License.
8+
* You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.elasticsearch.ingest.common;
21+
22+
import org.elasticsearch.common.unit.ByteSizeValue;
23+
24+
import java.util.Map;
25+
26+
/**
27+
* Processor that converts the content of string fields to the byte value.
28+
* Throws exception is the field is not of type string or can not convert to the numeric byte value
29+
*/
30+
public final class BytesProcessor extends AbstractStringProcessor {
31+
32+
public static final String TYPE = "bytes";
33+
34+
BytesProcessor(String processorTag, String field, boolean ignoreMissing, String targetField) {
35+
super(processorTag, field, ignoreMissing, targetField);
36+
}
37+
38+
@Override
39+
protected Long process(String value) {
40+
return ByteSizeValue.parseBytesSizeValue(value, null, getField()).getBytes();
41+
}
42+
43+
@Override
44+
public String getType() {
45+
return TYPE;
46+
}
47+
48+
public static final class Factory extends AbstractStringProcessor.Factory {
49+
50+
public Factory() {
51+
super(TYPE);
52+
}
53+
54+
@Override
55+
protected BytesProcessor newProcessor(String tag, Map<String, Object> config, String field,
56+
boolean ignoreMissing, String targetField) {
57+
return new BytesProcessor(tag, field, ignoreMissing, targetField);
58+
}
59+
}
60+
}

modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/IngestCommonPlugin.java

+1
Original file line numberDiff line numberDiff line change
@@ -81,6 +81,7 @@ public Map<String, Processor.Factory> getProcessors(Processor.Parameters paramet
8181
processors.put(JsonProcessor.TYPE, new JsonProcessor.Factory());
8282
processors.put(KeyValueProcessor.TYPE, new KeyValueProcessor.Factory());
8383
processors.put(URLDecodeProcessor.TYPE, new URLDecodeProcessor.Factory());
84+
processors.put(BytesProcessor.TYPE, new BytesProcessor.Factory());
8485
return Collections.unmodifiableMap(processors);
8586
}
8687

modules/ingest-common/src/test/java/org/elasticsearch/ingest/common/AbstractStringProcessorTestCase.java

+8-4
Original file line numberDiff line numberDiff line change
@@ -31,23 +31,27 @@
3131
import static org.hamcrest.Matchers.containsString;
3232
import static org.hamcrest.Matchers.equalTo;
3333

34-
public abstract class AbstractStringProcessorTestCase extends ESTestCase {
34+
public abstract class AbstractStringProcessorTestCase<T> extends ESTestCase {
3535

3636
protected abstract AbstractStringProcessor newProcessor(String field, boolean ignoreMissing, String targetField);
3737

3838
protected String modifyInput(String input) {
3939
return input;
4040
}
4141

42-
protected abstract String expectedResult(String input);
42+
protected abstract T expectedResult(String input);
43+
44+
protected Class<T> expectedResultType(){
45+
return (Class<T>) String.class; // most results types are Strings
46+
}
4347

4448
public void testProcessor() throws Exception {
4549
IngestDocument ingestDocument = RandomDocumentPicks.randomIngestDocument(random());
4650
String fieldValue = RandomDocumentPicks.randomString(random());
4751
String fieldName = RandomDocumentPicks.addRandomField(random(), ingestDocument, modifyInput(fieldValue));
4852
Processor processor = newProcessor(fieldName, randomBoolean(), fieldName);
4953
processor.execute(ingestDocument);
50-
assertThat(ingestDocument.getFieldValue(fieldName, String.class), equalTo(expectedResult(fieldValue)));
54+
assertThat(ingestDocument.getFieldValue(fieldName, expectedResultType()), equalTo(expectedResult(fieldValue)));
5155
}
5256

5357
public void testFieldNotFound() throws Exception {
@@ -109,6 +113,6 @@ public void testTargetField() throws Exception {
109113
String targetFieldName = fieldName + "foo";
110114
Processor processor = newProcessor(fieldName, randomBoolean(), targetFieldName);
111115
processor.execute(ingestDocument);
112-
assertThat(ingestDocument.getFieldValue(targetFieldName, String.class), equalTo(expectedResult(fieldValue)));
116+
assertThat(ingestDocument.getFieldValue(targetFieldName, expectedResultType()), equalTo(expectedResult(fieldValue)));
113117
}
114118
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
/*
2+
* Licensed to Elasticsearch under one or more contributor
3+
* license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright
5+
* ownership. Elasticsearch licenses this file to you under
6+
* the Apache License, Version 2.0 (the "License"); you may
7+
* not use this file except in compliance with the License.
8+
* You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.elasticsearch.ingest.common;
21+
22+
public class BytesProcessorFactoryTests extends AbstractStringProcessorFactoryTestCase {
23+
@Override
24+
protected AbstractStringProcessor.Factory newFactory() {
25+
return new BytesProcessor.Factory();
26+
}
27+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,98 @@
1+
/*
2+
* Licensed to Elasticsearch under one or more contributor
3+
* license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright
5+
* ownership. Elasticsearch licenses this file to you under
6+
* the Apache License, Version 2.0 (the "License"); you may
7+
* not use this file except in compliance with the License.
8+
* You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.elasticsearch.ingest.common;
21+
22+
import org.elasticsearch.ElasticsearchException;
23+
import org.elasticsearch.common.unit.ByteSizeUnit;
24+
import org.elasticsearch.common.unit.ByteSizeValue;
25+
import org.elasticsearch.ingest.IngestDocument;
26+
import org.elasticsearch.ingest.Processor;
27+
import org.elasticsearch.ingest.RandomDocumentPicks;
28+
import org.hamcrest.CoreMatchers;
29+
30+
import static org.hamcrest.Matchers.equalTo;
31+
32+
public class BytesProcessorTests extends AbstractStringProcessorTestCase {
33+
34+
private String modifiedInput;
35+
36+
@Override
37+
protected AbstractStringProcessor newProcessor(String field, boolean ignoreMissing, String targetField) {
38+
return new BytesProcessor(randomAlphaOfLength(10), field, ignoreMissing, targetField);
39+
}
40+
41+
@Override
42+
protected String modifyInput(String input) {
43+
//largest value that allows all results < Long.MAX_VALUE bytes
44+
long randomNumber = randomLongBetween(1, Long.MAX_VALUE / ByteSizeUnit.PB.toBytes(1));
45+
ByteSizeUnit randomUnit = randomFrom(ByteSizeUnit.values());
46+
modifiedInput = randomNumber + randomUnit.getSuffix();
47+
return modifiedInput;
48+
}
49+
50+
@Override
51+
protected Long expectedResult(String input) {
52+
return ByteSizeValue.parseBytesSizeValue(modifiedInput, null, "").getBytes();
53+
}
54+
55+
@Override
56+
protected Class<Long> expectedResultType() {
57+
return Long.class;
58+
}
59+
60+
public void testTooLarge() {
61+
IngestDocument ingestDocument = RandomDocumentPicks.randomIngestDocument(random());
62+
String fieldName = RandomDocumentPicks.addRandomField(random(), ingestDocument, "8912pb");
63+
Processor processor = newProcessor(fieldName, randomBoolean(), fieldName);
64+
ElasticsearchException exception = expectThrows(ElasticsearchException.class, () -> processor.execute(ingestDocument));
65+
assertThat(exception.getMessage(),
66+
CoreMatchers.equalTo("failed to parse setting [" + fieldName + "] with value [8912pb] as a size in bytes"));
67+
assertThat(exception.getCause().getMessage(),
68+
CoreMatchers.containsString("Values greater than 9223372036854775807 bytes are not supported"));
69+
}
70+
71+
public void testNotBytes() {
72+
IngestDocument ingestDocument = RandomDocumentPicks.randomIngestDocument(random());
73+
String fieldName = RandomDocumentPicks.addRandomField(random(), ingestDocument, "junk");
74+
Processor processor = newProcessor(fieldName, randomBoolean(), fieldName);
75+
ElasticsearchException exception = expectThrows(ElasticsearchException.class, () -> processor.execute(ingestDocument));
76+
assertThat(exception.getMessage(),
77+
CoreMatchers.equalTo("failed to parse [junk]"));
78+
}
79+
80+
public void testMissingUnits() {
81+
IngestDocument ingestDocument = RandomDocumentPicks.randomIngestDocument(random());
82+
String fieldName = RandomDocumentPicks.addRandomField(random(), ingestDocument, "1");
83+
Processor processor = newProcessor(fieldName, randomBoolean(), fieldName);
84+
ElasticsearchException exception = expectThrows(ElasticsearchException.class, () -> processor.execute(ingestDocument));
85+
assertThat(exception.getMessage(),
86+
CoreMatchers.containsString("unit is missing or unrecognized"));
87+
}
88+
89+
public void testFractional() throws Exception {
90+
IngestDocument ingestDocument = RandomDocumentPicks.randomIngestDocument(random());
91+
String fieldName = RandomDocumentPicks.addRandomField(random(), ingestDocument, "1.1kb");
92+
Processor processor = newProcessor(fieldName, randomBoolean(), fieldName);
93+
processor.execute(ingestDocument);
94+
assertThat(ingestDocument.getFieldValue(fieldName, expectedResultType()), equalTo(1126L));
95+
assertWarnings("Fractional bytes values are deprecated. Use non-fractional bytes values instead: [1.1kb] found for setting " +
96+
"[" + fieldName + "]");
97+
}
98+
}

modules/ingest-common/src/test/resources/rest-api-spec/test/ingest/10_basic.yml

+1
Original file line numberDiff line numberDiff line change
@@ -30,3 +30,4 @@
3030
- contains: { nodes.$master.ingest.processors: { type: split } }
3131
- contains: { nodes.$master.ingest.processors: { type: trim } }
3232
- contains: { nodes.$master.ingest.processors: { type: uppercase } }
33+
- contains: { nodes.$master.ingest.processors: { type: bytes } }
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
---
2+
teardown:
3+
- do:
4+
ingest.delete_pipeline:
5+
id: "my_pipeline"
6+
ignore: 404
7+
8+
---
9+
"Test bytes processor":
10+
- do:
11+
ingest.put_pipeline:
12+
id: "my_pipeline"
13+
body: >
14+
{
15+
"description": "_description",
16+
"processors": [
17+
{
18+
"bytes" : {
19+
"field" : "bytes_source_field",
20+
"target_field" : "bytes_target_field"
21+
}
22+
}
23+
]
24+
}
25+
- match: { acknowledged: true }
26+
27+
- do:
28+
index:
29+
index: test
30+
type: test
31+
id: 1
32+
pipeline: "my_pipeline"
33+
body: {bytes_source_field: "1kb"}
34+
35+
- do:
36+
get:
37+
index: test
38+
type: test
39+
id: 1
40+
- match: { _source.bytes_source_field: "1kb" }
41+
- match: { _source.bytes_target_field: 1024 }
42+

0 commit comments

Comments
 (0)