Skip to content

Commit fe1fbd9

Browse files
committed
Add enrich processor
The enrich processor performs a lookup in a locally allocated enrich index shard using a field value from the document being enriched. If there is a match then the _source of the enrich document is fetched. The document being enriched then gets the decorate values from the enrich document based on the configured decorate fields in the pipeline. Note that the usage of the _source field is temporary until the enrich source field that is part of elastic#41521 is merged into the enrich branch. Using the _source field involves significant decompression which not desired for enrich use cases. The policy contains the information what field in the enrich index to query and what fields are available to decorate a document being enriched with. The enrich processor has the following configuration options: * `policy_name` - the name of the policy this processor should use * `enrich_key` - the field in the document being enriched that holds to lookup value * `enrich_key_ignore_missing` - Whether to allow the key field to be missing * `enrich_values` - a list of fields to decorate the document being enriched with. Each entry holds a source field and a target field. The source field indicates what decorate field to use that is available in the policy. The target field controls the field name to use in the document being enriched. The source and target fields can be the same. Example pipeline config: ``` { "processors": [ { "policy_name": "my_policy", "enrich_key": "host_name", "enrich_values": [ { "source": "globalRank", "target": "global_rank" } ] } ] } ``` In the above example documents are being enriched with a global rank value. For each document that has match in the enrich index based on its host_name field, the document gets an global rank field value, which is fetched from the `globalRank` field in the enrich index and saved as `global_rank` in the document being enriched. This is PR is part one of elastic#41521
1 parent 284c508 commit fe1fbd9

File tree

6 files changed

+644
-2
lines changed

6 files changed

+644
-2
lines changed

x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/enrich/EnrichPolicy.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@
2828
*/
2929
public final class EnrichPolicy implements Writeable, ToXContentFragment {
3030

31-
static final String EXACT_MATCH_TYPE = "exact_match";
31+
public static final String EXACT_MATCH_TYPE = "exact_match";
3232
public static final String[] SUPPORTED_POLICY_TYPES = new String[]{EXACT_MATCH_TYPE};
3333

3434
static final ParseField TYPE = new ParseField("type");

x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/EnrichPlugin.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
package org.elasticsearch.xpack.enrich;
77

88
import org.elasticsearch.cluster.metadata.MetaData;
9+
import org.elasticsearch.cluster.service.ClusterService;
910
import org.elasticsearch.common.ParseField;
1011
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
1112
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
@@ -21,7 +22,8 @@ public class EnrichPlugin extends Plugin implements IngestPlugin {
2122

2223
@Override
2324
public Map<String, Processor.Factory> getProcessors(Processor.Parameters parameters) {
24-
return Collections.emptyMap();
25+
final ClusterService clusterService = parameters.ingestService.getClusterService();
26+
return Map.of(EnrichProcessorFactory.TYPE, new EnrichProcessorFactory(clusterService::state, parameters.localShardSearcher));
2527
}
2628

2729
@Override
Lines changed: 79 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,79 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License;
4+
* you may not use this file except in compliance with the Elastic License.
5+
*/
6+
package org.elasticsearch.xpack.enrich;
7+
8+
import org.elasticsearch.cluster.ClusterState;
9+
import org.elasticsearch.index.engine.Engine;
10+
import org.elasticsearch.ingest.ConfigurationUtils;
11+
import org.elasticsearch.ingest.Processor;
12+
import org.elasticsearch.xpack.core.enrich.EnrichPolicy;
13+
14+
import java.util.List;
15+
import java.util.Map;
16+
import java.util.function.Function;
17+
import java.util.function.Supplier;
18+
import java.util.stream.Collectors;
19+
20+
final class EnrichProcessorFactory implements Processor.Factory {
21+
22+
static final String TYPE = "enrich";
23+
24+
private final Function<String, EnrichPolicy> policyLookup;
25+
private final Function<String, Engine.Searcher> searchProvider;
26+
27+
EnrichProcessorFactory(Supplier<ClusterState> clusterStateSupplier,
28+
Function<String, Engine.Searcher> searchProvider) {
29+
this.policyLookup = policyName -> {
30+
ClusterState clusterState = clusterStateSupplier.get();
31+
return EnrichStore.getPolicy(policyName, clusterState);
32+
};
33+
this.searchProvider = searchProvider;
34+
}
35+
36+
@Override
37+
public Processor create(Map<String, Processor.Factory> processorFactories, String tag, Map<String, Object> config) throws Exception {
38+
String policyName = ConfigurationUtils.readStringProperty(TYPE, tag, config, "policy_name");
39+
EnrichPolicy policy = policyLookup.apply(policyName);
40+
if (policy == null) {
41+
throw new IllegalArgumentException("policy [" + policyName + "] does not exists");
42+
}
43+
44+
String enrichKey = ConfigurationUtils.readStringProperty(TYPE, tag, config, "enrich_key", policy.getEnrichKey());
45+
boolean ignoreKeyMissing = ConfigurationUtils.readBooleanProperty(TYPE, tag, config, "enrich_key_ignore_missing", false);
46+
47+
final List<EnrichSpecification> specifications;
48+
final List<Map<?, ?>> specificationConfig = ConfigurationUtils.readList(TYPE, tag, config, "enrich_values");
49+
specifications = specificationConfig.stream()
50+
.map(entry -> new EnrichSpecification((String) entry.get("source"), (String) entry.get("target")))
51+
.collect(Collectors.toList());
52+
53+
for (EnrichSpecification specification : specifications) {
54+
if (policy.getEnrichValues().contains(specification.sourceField) == false) {
55+
throw new IllegalArgumentException("source field [" + specification.sourceField + "] does not exist in policy [" +
56+
policyName + "]");
57+
}
58+
}
59+
60+
switch (policy.getType()) {
61+
case EnrichPolicy.EXACT_MATCH_TYPE:
62+
return new ExactMatchProcessor(tag, policyLookup, searchProvider, policyName, enrichKey, ignoreKeyMissing, specifications);
63+
default:
64+
throw new IllegalArgumentException("unsupported policy type [" + policy.getType() + "]");
65+
}
66+
}
67+
68+
static final class EnrichSpecification {
69+
70+
final String sourceField;
71+
final String targetField;
72+
73+
EnrichSpecification(String sourceField, String targetField) {
74+
this.sourceField = sourceField;
75+
this.targetField = targetField;
76+
}
77+
}
78+
79+
}
Lines changed: 124 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,124 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License;
4+
* you may not use this file except in compliance with the Elastic License.
5+
*/
6+
package org.elasticsearch.xpack.enrich;
7+
8+
import org.apache.lucene.document.Document;
9+
import org.apache.lucene.index.LeafReader;
10+
import org.apache.lucene.index.PostingsEnum;
11+
import org.apache.lucene.index.Terms;
12+
import org.apache.lucene.index.TermsEnum;
13+
import org.apache.lucene.util.BytesRef;
14+
import org.elasticsearch.common.bytes.BytesArray;
15+
import org.elasticsearch.common.bytes.BytesReference;
16+
import org.elasticsearch.common.xcontent.XContentHelper;
17+
import org.elasticsearch.common.xcontent.XContentType;
18+
import org.elasticsearch.index.engine.Engine;
19+
import org.elasticsearch.index.mapper.SourceFieldMapper;
20+
import org.elasticsearch.ingest.AbstractProcessor;
21+
import org.elasticsearch.ingest.IngestDocument;
22+
import org.elasticsearch.xpack.core.enrich.EnrichPolicy;
23+
24+
import java.util.List;
25+
import java.util.Map;
26+
import java.util.Set;
27+
import java.util.function.Function;
28+
29+
final class ExactMatchProcessor extends AbstractProcessor {
30+
31+
private final Function<String, EnrichPolicy> policyLookup;
32+
private final Function<String, Engine.Searcher> searchProvider;
33+
34+
private final String policyName;
35+
private final String enrichKey;
36+
private final boolean ignoreEnrichKeyMissing;
37+
private final List<EnrichProcessorFactory.EnrichSpecification> specifications;
38+
39+
ExactMatchProcessor(String tag,
40+
Function<String, EnrichPolicy> policyLookup,
41+
Function<String, Engine.Searcher> searchProvider, String policyName,
42+
String enrichKey,
43+
boolean ignoreEnrichKeyMissing,
44+
List<EnrichProcessorFactory.EnrichSpecification> specifications) {
45+
super(tag);
46+
this.policyLookup = policyLookup;
47+
this.searchProvider = searchProvider;
48+
this.policyName = policyName;
49+
this.enrichKey = enrichKey;
50+
this.ignoreEnrichKeyMissing = ignoreEnrichKeyMissing;
51+
this.specifications = specifications;
52+
}
53+
54+
@Override
55+
public IngestDocument execute(IngestDocument ingestDocument) throws Exception {
56+
final EnrichPolicy policy = policyLookup.apply(policyName);
57+
if (policy == null) {
58+
throw new IllegalArgumentException("policy [" + policyName + "] does not exists");
59+
}
60+
61+
final String value = ingestDocument.getFieldValue(enrichKey, String.class, ignoreEnrichKeyMissing);
62+
if (value == null) {
63+
return ingestDocument;
64+
}
65+
66+
// TODO: re-use the engine searcher between enriching documents from the same write request
67+
try (Engine.Searcher engineSearcher = searchProvider.apply(policy.getIndexPattern())) {
68+
if (engineSearcher.getDirectoryReader().leaves().size() == 0) {
69+
return ingestDocument;
70+
} else if (engineSearcher.getDirectoryReader().leaves().size() != 1) {
71+
throw new IllegalStateException("enrich index must have exactly a single segment");
72+
}
73+
74+
final LeafReader leafReader = engineSearcher.getDirectoryReader().leaves().get(0).reader();
75+
final Terms terms = leafReader.terms(policy.getEnrichKey());
76+
if (terms == null) {
77+
throw new IllegalStateException("enrich key field [" + policy.getEnrichKey() + "] does not exist");
78+
}
79+
80+
final TermsEnum tenum = terms.iterator();
81+
if (tenum.seekExact(new BytesRef(value))) {
82+
PostingsEnum penum = tenum.postings(null, PostingsEnum.NONE);
83+
final int docId = penum.nextDoc();
84+
assert docId != PostingsEnum.NO_MORE_DOCS : "no matching doc id for [" + enrichKey + "]";
85+
assert penum.nextDoc() == PostingsEnum.NO_MORE_DOCS : "more than one doc id matching for [" + enrichKey + "]";
86+
87+
// TODO: The use of _source is temporarily until enrich source field mapper has been added (see PR #41521)
88+
Document document = leafReader.document(docId, Set.of(SourceFieldMapper.NAME));
89+
BytesRef source = document.getBinaryValue(SourceFieldMapper.NAME);
90+
assert source != null;
91+
92+
final BytesReference encoded = new BytesArray(source);
93+
final Map<String, Object> decoded =
94+
XContentHelper.convertToMap(encoded, false, XContentType.SMILE).v2();
95+
for (EnrichProcessorFactory.EnrichSpecification specification : specifications) {
96+
Object enrichValue = decoded.get(specification.sourceField);
97+
ingestDocument.setFieldValue(specification.targetField, enrichValue);
98+
}
99+
}
100+
}
101+
return ingestDocument;
102+
}
103+
104+
@Override
105+
public String getType() {
106+
return EnrichProcessorFactory.TYPE;
107+
}
108+
109+
String getPolicyName() {
110+
return policyName;
111+
}
112+
113+
String getEnrichKey() {
114+
return enrichKey;
115+
}
116+
117+
boolean isIgnoreEnrichKeyMissing() {
118+
return ignoreEnrichKeyMissing;
119+
}
120+
121+
List<EnrichProcessorFactory.EnrichSpecification> getSpecifications() {
122+
return specifications;
123+
}
124+
}

0 commit comments

Comments
 (0)