Skip to content

add exclude_keys option to KeyValueProcessor #24876

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Jun 5, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions docs/reference/ingest/ingest-node.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -1621,6 +1621,7 @@ For example, if you have a log message which contains `ip=1.2.3.4 error=REFUSED`
| `value_split` | yes | - | Regex pattern to use for splitting the key from the value within a key-value pair
| `target_field` | no | `null` | The field to insert the extracted keys into. Defaults to the root of the document
| `include_keys` | no | `null` | List of keys to filter and insert into document. Defaults to including all keys
| `exclude_keys` | no | `null` | List of keys to exclude from document
| `ignore_missing` | no | `false` | If `true` and `field` does not exist or is `null`, the processor quietly exits without modifying the document
|======

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,18 @@

package org.elasticsearch.ingest.common;

import org.elasticsearch.common.util.set.Sets;
import org.elasticsearch.ingest.AbstractProcessor;
import org.elasticsearch.ingest.ConfigurationUtils;
import org.elasticsearch.ingest.IngestDocument;
import org.elasticsearch.ingest.Processor;

import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;

/**
* The KeyValueProcessor parses and extracts messages of the `key=value` variety into fields with values of the keys.
Expand All @@ -39,18 +42,20 @@ public final class KeyValueProcessor extends AbstractProcessor {
private final String field;
private final String fieldSplit;
private final String valueSplit;
private final List<String> includeKeys;
private final Set<String> includeKeys;
private final Set<String> excludeKeys;
private final String targetField;
private final boolean ignoreMissing;

KeyValueProcessor(String tag, String field, String fieldSplit, String valueSplit, List<String> includeKeys,
String targetField, boolean ignoreMissing) {
KeyValueProcessor(String tag, String field, String fieldSplit, String valueSplit, Set<String> includeKeys,
Set<String> excludeKeys, String targetField, boolean ignoreMissing) {
super(tag);
this.field = field;
this.targetField = targetField;
this.fieldSplit = fieldSplit;
this.valueSplit = valueSplit;
this.includeKeys = includeKeys;
this.excludeKeys = excludeKeys;
this.ignoreMissing = ignoreMissing;
}

Expand All @@ -66,10 +71,14 @@ String getValueSplit() {
return valueSplit;
}

List<String> getIncludeKeys() {
Set<String> getIncludeKeys() {
return includeKeys;
}

Set<String> getExcludeKeys() {
return excludeKeys;
}

String getTargetField() {
return targetField;
}
Expand Down Expand Up @@ -105,7 +114,9 @@ public void execute(IngestDocument document) {
}
return kv;
})
.filter((p) -> includeKeys == null || includeKeys.contains(p[0]))
.filter((p) ->
(includeKeys == null || includeKeys.contains(p[0])) &&
(excludeKeys == null || excludeKeys.contains(p[0]) == false))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Those contains calls have a linear complexity due to the fact we are using lists, let's use hash sets instead? (and maybe even automata as a follow-up?)

Copy link
Contributor Author

@talevy talevy May 30, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thanks, I suppose I assumed the cost is minimal since this list is not expected to be more than just a few items... but that is being multiplied by millions of documents, so good call

updated

.forEach((p) -> append(document, fieldPathPrefix + p[0], p[1]));
}

Expand All @@ -122,12 +133,18 @@ public KeyValueProcessor create(Map<String, Processor.Factory> registry, String
String targetField = ConfigurationUtils.readOptionalStringProperty(TYPE, processorTag, config, "target_field");
String fieldSplit = ConfigurationUtils.readStringProperty(TYPE, processorTag, config, "field_split");
String valueSplit = ConfigurationUtils.readStringProperty(TYPE, processorTag, config, "value_split");
List<String> includeKeys = ConfigurationUtils.readOptionalList(TYPE, processorTag, config, "include_keys");
if (includeKeys != null) {
includeKeys = Collections.unmodifiableList(includeKeys);
Set<String> includeKeys = null;
Set<String> excludeKeys = null;
List<String> includeKeysList = ConfigurationUtils.readOptionalList(TYPE, processorTag, config, "include_keys");
if (includeKeysList != null) {
includeKeys = Collections.unmodifiableSet(Sets.newHashSet(includeKeysList));
}
List<String> excludeKeysList = ConfigurationUtils.readOptionalList(TYPE, processorTag, config, "exclude_keys");
if (excludeKeysList != null) {
excludeKeys = Collections.unmodifiableSet(Sets.newHashSet(excludeKeysList));
}
boolean ignoreMissing = ConfigurationUtils.readBooleanProperty(TYPE, processorTag, config, "ignore_missing", false);
return new KeyValueProcessor(processorTag, field, fieldSplit, valueSplit, includeKeys, targetField, ignoreMissing);
return new KeyValueProcessor(processorTag, field, fieldSplit, valueSplit, includeKeys, excludeKeys, targetField, ignoreMissing);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,11 @@

import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ElasticsearchParseException;
import org.elasticsearch.common.util.set.Sets;
import org.elasticsearch.test.ESTestCase;

import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;

Expand Down Expand Up @@ -58,14 +60,16 @@ public void testCreateWithAllFieldsSet() throws Exception {
config.put("value_split", "=");
config.put("target_field", "target");
config.put("include_keys", Arrays.asList("a", "b"));
config.put("exclude_keys", Collections.emptyList());
config.put("ignore_missing", true);
String processorTag = randomAlphaOfLength(10);
KeyValueProcessor processor = factory.create(null, processorTag, config);
assertThat(processor.getTag(), equalTo(processorTag));
assertThat(processor.getField(), equalTo("field1"));
assertThat(processor.getFieldSplit(), equalTo("&"));
assertThat(processor.getValueSplit(), equalTo("="));
assertThat(processor.getIncludeKeys(), equalTo(Arrays.asList("a", "b")));
assertThat(processor.getIncludeKeys(), equalTo(Sets.newHashSet("a", "b")));
assertThat(processor.getExcludeKeys(), equalTo(Collections.emptySet()));
assertThat(processor.getTargetField(), equalTo("target"));
assertTrue(processor.isIgnoreMissing());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

package org.elasticsearch.ingest.common;

import org.elasticsearch.common.util.set.Sets;
import org.elasticsearch.ingest.IngestDocument;
import org.elasticsearch.ingest.Processor;
import org.elasticsearch.ingest.RandomDocumentPicks;
Expand All @@ -36,7 +37,7 @@ public class KeyValueProcessorTests extends ESTestCase {
public void test() throws Exception {
IngestDocument ingestDocument = RandomDocumentPicks.randomIngestDocument(random());
String fieldName = RandomDocumentPicks.addRandomField(random(), ingestDocument, "first=hello&second=world&second=universe");
Processor processor = new KeyValueProcessor(randomAlphaOfLength(10), fieldName, "&", "=", null, "target", false);
Processor processor = new KeyValueProcessor(randomAlphaOfLength(10), fieldName, "&", "=", null, null, "target", false);
processor.execute(ingestDocument);
assertThat(ingestDocument.getFieldValue("target.first", String.class), equalTo("hello"));
assertThat(ingestDocument.getFieldValue("target.second", List.class), equalTo(Arrays.asList("world", "universe")));
Expand All @@ -45,7 +46,7 @@ public void test() throws Exception {
public void testRootTarget() throws Exception {
IngestDocument ingestDocument = RandomDocumentPicks.randomIngestDocument(random(), Collections.emptyMap());
ingestDocument.setFieldValue("myField", "first=hello&second=world&second=universe");
Processor processor = new KeyValueProcessor(randomAlphaOfLength(10), "myField", "&", "=", null, null, false);
Processor processor = new KeyValueProcessor(randomAlphaOfLength(10), "myField", "&", "=", null, null,null, false);
processor.execute(ingestDocument);
assertThat(ingestDocument.getFieldValue("first", String.class), equalTo("hello"));
assertThat(ingestDocument.getFieldValue("second", List.class), equalTo(Arrays.asList("world", "universe")));
Expand All @@ -54,7 +55,7 @@ public void testRootTarget() throws Exception {
public void testKeySameAsSourceField() throws Exception {
IngestDocument ingestDocument = RandomDocumentPicks.randomIngestDocument(random(), Collections.emptyMap());
ingestDocument.setFieldValue("first", "first=hello");
Processor processor = new KeyValueProcessor(randomAlphaOfLength(10), "first", "&", "=", null, null, false);
Processor processor = new KeyValueProcessor(randomAlphaOfLength(10), "first", "&", "=", null, null,null, false);
processor.execute(ingestDocument);
assertThat(ingestDocument.getFieldValue("first", List.class), equalTo(Arrays.asList("first=hello", "hello")));
}
Expand All @@ -63,15 +64,38 @@ public void testIncludeKeys() throws Exception {
IngestDocument ingestDocument = RandomDocumentPicks.randomIngestDocument(random());
String fieldName = RandomDocumentPicks.addRandomField(random(), ingestDocument, "first=hello&second=world&second=universe");
Processor processor = new KeyValueProcessor(randomAlphaOfLength(10), fieldName, "&", "=",
Collections.singletonList("first"), "target", false);
Sets.newHashSet("first"), null, "target", false);
processor.execute(ingestDocument);
assertThat(ingestDocument.getFieldValue("target.first", String.class), equalTo("hello"));
assertFalse(ingestDocument.hasField("target.second"));
}

public void testExcludeKeys() throws Exception {
IngestDocument ingestDocument = RandomDocumentPicks.randomIngestDocument(random());
String fieldName = RandomDocumentPicks.addRandomField(random(), ingestDocument, "first=hello&second=world&second=universe");
Processor processor = new KeyValueProcessor(randomAlphaOfLength(10), fieldName, "&", "=",
null, Sets.newHashSet("second"), "target", false);
processor.execute(ingestDocument);
assertThat(ingestDocument.getFieldValue("target.first", String.class), equalTo("hello"));
assertFalse(ingestDocument.hasField("target.second"));
}

public void testIncludeAndExcludeKeys() throws Exception {
IngestDocument ingestDocument = RandomDocumentPicks.randomIngestDocument(random());
String fieldName = RandomDocumentPicks.addRandomField(random(), ingestDocument,
"first=hello&second=world&second=universe&third=bar");
Processor processor = new KeyValueProcessor(randomAlphaOfLength(10), fieldName, "&", "=",
Sets.newHashSet("first", "second"), Sets.newHashSet("first", "second"), "target", false);
processor.execute(ingestDocument);
assertFalse(ingestDocument.hasField("target.first"));
assertFalse(ingestDocument.hasField("target.second"));
assertFalse(ingestDocument.hasField("target.third"));
}

public void testMissingField() {
IngestDocument ingestDocument = RandomDocumentPicks.randomIngestDocument(random(), Collections.emptyMap());
Processor processor = new KeyValueProcessor(randomAlphaOfLength(10), "unknown", "&", "=", null, "target", false);
Processor processor = new KeyValueProcessor(randomAlphaOfLength(10), "unknown", "&",
"=", null, null, "target", false);
IllegalArgumentException exception = expectThrows(IllegalArgumentException.class, () -> processor.execute(ingestDocument));
assertThat(exception.getMessage(), equalTo("field [unknown] not present as part of path [unknown]"));
}
Expand All @@ -81,31 +105,31 @@ public void testNullValueWithIgnoreMissing() throws Exception {
IngestDocument originalIngestDocument = RandomDocumentPicks.randomIngestDocument(random(),
Collections.singletonMap(fieldName, null));
IngestDocument ingestDocument = new IngestDocument(originalIngestDocument);
Processor processor = new KeyValueProcessor(randomAlphaOfLength(10), fieldName, "", "", null, "target", true);
Processor processor = new KeyValueProcessor(randomAlphaOfLength(10), fieldName, "", "", null, null, "target", true);
processor.execute(ingestDocument);
assertIngestDocument(originalIngestDocument, ingestDocument);
}

public void testNonExistentWithIgnoreMissing() throws Exception {
IngestDocument originalIngestDocument = RandomDocumentPicks.randomIngestDocument(random(), Collections.emptyMap());
IngestDocument ingestDocument = new IngestDocument(originalIngestDocument);
Processor processor = new KeyValueProcessor(randomAlphaOfLength(10), "unknown", "", "", null, "target", true);
Processor processor = new KeyValueProcessor(randomAlphaOfLength(10), "unknown", "", "", null, null, "target", true);
processor.execute(ingestDocument);
assertIngestDocument(originalIngestDocument, ingestDocument);
}

public void testFailFieldSplitMatch() throws Exception {
IngestDocument ingestDocument = RandomDocumentPicks.randomIngestDocument(random());
String fieldName = RandomDocumentPicks.addRandomField(random(), ingestDocument, "first=hello|second=world|second=universe");
Processor processor = new KeyValueProcessor(randomAlphaOfLength(10), fieldName, "&", "=", null, "target", false);
Processor processor = new KeyValueProcessor(randomAlphaOfLength(10), fieldName, "&", "=", null, null, "target", false);
processor.execute(ingestDocument);
assertThat(ingestDocument.getFieldValue("target.first", String.class), equalTo("hello|second=world|second=universe"));
assertFalse(ingestDocument.hasField("target.second"));
}

public void testFailValueSplitMatch() throws Exception {
IngestDocument ingestDocument = RandomDocumentPicks.randomIngestDocument(random(), Collections.singletonMap("foo", "bar"));
Processor processor = new KeyValueProcessor(randomAlphaOfLength(10), "foo", "&", "=", null, "target", false);
Processor processor = new KeyValueProcessor(randomAlphaOfLength(10), "foo", "&", "=", null, null, "target", false);
Exception exception = expectThrows(IllegalArgumentException.class, () -> processor.execute(ingestDocument));
assertThat(exception.getMessage(), equalTo("field [foo] does not contain value_split [=]"));
}
Expand Down