Skip to content

Commit b65513e

Browse files
authored
Keep track of the enrich key field in the enrich index. (#42022)
The enrich key field is being kept track in _meta field by the policy runner. The ingest processor uses the field name defined in enrich index _meta field and not in the policy. This will avoid problems if policy is changed without a new enrich index being created. This also complete decouples EnrichPolicy from ExactMatchProcessor. The following scenario results in failure without this change: 1) Create policy 2) Execute policy 3) Create pipeline with enrich processor 4) Use pipeline 5) Update enrich key in policy 6) Use pipeline, which then fails.
1 parent 28c529f commit b65513e

File tree

9 files changed

+135
-84
lines changed

9 files changed

+135
-84
lines changed

server/src/main/java/org/elasticsearch/ingest/IngestService.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@
3636
import org.elasticsearch.cluster.ClusterChangedEvent;
3737
import org.elasticsearch.cluster.ClusterState;
3838
import org.elasticsearch.cluster.ClusterStateApplier;
39+
import org.elasticsearch.cluster.metadata.IndexMetaData;
3940
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
4041
import org.elasticsearch.cluster.metadata.MetaData;
4142
import org.elasticsearch.cluster.node.DiscoveryNode;
@@ -127,7 +128,8 @@ public IngestService(ClusterService clusterService, ThreadPool threadPool,
127128
}
128129

129130
IndexShard indexShard = indexService.getShard(0);
130-
return indexShard.acquireSearcher("ingest");
131+
IndexMetaData imd = state.metaData().index(index);
132+
return new Tuple<>(imd, indexShard.acquireSearcher("ingest"));
131133
}
132134
)
133135
);

server/src/main/java/org/elasticsearch/ingest/Processor.java

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,8 @@
1919

2020
package org.elasticsearch.ingest;
2121

22+
import org.elasticsearch.cluster.metadata.IndexMetaData;
23+
import org.elasticsearch.common.collect.Tuple;
2224
import org.elasticsearch.common.util.concurrent.ThreadContext;
2325
import org.elasticsearch.env.Environment;
2426
import org.elasticsearch.index.analysis.AnalysisRegistry;
@@ -114,14 +116,16 @@ class Parameters {
114116

115117
/**
116118
* Provides access to an engine searcher of a locally allocated index specified for the provided index.
119+
* The input of this function is an index expression and this function returns the {@link IndexMetaData}
120+
* of the resolved locally allocated index and {@link Engine.Searcher} instance for the resolved index.
117121
*
118122
* The locally allocated index must be have a single primary shard.
119123
*/
120-
public final Function<String, Engine.Searcher> localShardSearcher;
124+
public final Function<String, Tuple<IndexMetaData, Engine.Searcher>> localShardSearcher;
121125

122126
public Parameters(Environment env, ScriptService scriptService, AnalysisRegistry analysisRegistry, ThreadContext threadContext,
123127
LongSupplier relativeTimeSupplier, BiFunction<Long, Runnable, Scheduler.ScheduledCancellable> scheduler,
124-
IngestService ingestService, Function<String, Engine.Searcher> localShardSearcher) {
128+
IngestService ingestService, Function<String, Tuple<IndexMetaData, Engine.Searcher>> localShardSearcher) {
125129
this.env = env;
126130
this.scriptService = scriptService;
127131
this.threadContext = threadContext;

server/src/test/java/org/elasticsearch/ingest/IngestLocalShardSearcherTests.java

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,9 @@
2929
import org.elasticsearch.action.get.GetRequest;
3030
import org.elasticsearch.action.index.IndexRequest;
3131
import org.elasticsearch.action.ingest.PutPipelineRequest;
32+
import org.elasticsearch.cluster.metadata.IndexMetaData;
3233
import org.elasticsearch.common.bytes.BytesReference;
34+
import org.elasticsearch.common.collect.Tuple;
3335
import org.elasticsearch.common.settings.Settings;
3436
import org.elasticsearch.common.xcontent.XContentType;
3537
import org.elasticsearch.index.IndexModule;
@@ -180,17 +182,17 @@ static class TestProcessor extends AbstractProcessor {
180182

181183
static final String NAME = "test_processor";
182184

183-
private final Function<String, Engine.Searcher> localShardSearcher;
185+
private final Function<String, Tuple<IndexMetaData, Engine.Searcher>> localShardSearcher;
184186

185-
TestProcessor(String tag, Function<String, Engine.Searcher> localShardSearcher) {
187+
TestProcessor(String tag, Function<String, Tuple<IndexMetaData, Engine.Searcher>> localShardSearcher) {
186188
super(tag);
187189
this.localShardSearcher = localShardSearcher;
188190
}
189191

190192
@Override
191193
public IngestDocument execute(IngestDocument ingestDocument) throws Exception {
192194
String indexExpression = "reference-index";
193-
try (Engine.Searcher engineSearcher = localShardSearcher.apply(indexExpression)) {
195+
try (Engine.Searcher engineSearcher = localShardSearcher.apply(indexExpression).v2()) {
194196
// Ensure that search wrapper has been invoked by checking the directory instance type:
195197
if ((engineSearcher.getDirectoryReader() instanceof TestDirectyReader) == false) {
196198
// asserting or throwing a AssertionError makes this test hang:
@@ -210,9 +212,9 @@ public String getType() {
210212

211213
static class Factory implements Processor.Factory {
212214

213-
private final Function<String, Engine.Searcher> localShardSearcher;
215+
private final Function<String, Tuple<IndexMetaData, Engine.Searcher>> localShardSearcher;
214216

215-
Factory(Function<String, Engine.Searcher> localShardSearcher) {
217+
Factory(Function<String, Tuple<IndexMetaData, Engine.Searcher>> localShardSearcher) {
216218
this.localShardSearcher = localShardSearcher;
217219
}
218220

x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/enrich/EnrichPolicy.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -132,7 +132,7 @@ public String getSchedule() {
132132
return schedule;
133133
}
134134

135-
public String getBaseName(String policyName) {
135+
public static String getBaseName(String policyName) {
136136
return ENRICH_INDEX_NAME_BASE + policyName;
137137
}
138138

x-pack/plugin/enrich/qa/rest/src/test/java/org/elasticsearch/xpack/enrich/EnrichIT.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
import org.apache.http.util.EntityUtils;
1111
import org.elasticsearch.client.Request;
1212
import org.elasticsearch.client.Response;
13+
import org.elasticsearch.common.settings.Settings;
1314
import org.elasticsearch.common.xcontent.XContentBuilder;
1415
import org.elasticsearch.common.xcontent.XContentHelper;
1516
import org.elasticsearch.common.xcontent.XContentType;
@@ -45,6 +46,10 @@ public void testBasicFlow() throws Exception {
4546
"\"enrich_values\": [\"globalRank\", \"tldRank\", \"tld\"], \"schedule\": \"0 5 * * *\"}");
4647
assertOK(client().performRequest(putPolicyRequest));
4748

49+
// create index (remove when execute policy api has been added)
50+
String mapping = "\"_meta\": {\"enrich_key_field\": \"host\"}";
51+
createIndex(".enrich-my_policy", Settings.EMPTY, mapping);
52+
4853
// Add a single enrich document for now and then refresh:
4954
Request indexRequest = new Request("PUT", "/.enrich-my_policy/_doc/elastic.co");
5055
XContentBuilder document = XContentBuilder.builder(XContentType.SMILE.xContent());

x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/EnrichPolicyRunner.java

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,8 @@
4545
import org.elasticsearch.search.builder.SearchSourceBuilder;
4646
import org.elasticsearch.xpack.core.enrich.EnrichPolicy;
4747

48+
import static org.elasticsearch.xpack.enrich.ExactMatchProcessor.ENRICH_KEY_FIELD_NAME;
49+
4850
public class EnrichPolicyRunner implements Runnable {
4951

5052
private static final Logger logger = LogManager.getLogger(EnrichPolicyRunner.class);
@@ -145,6 +147,9 @@ private XContentBuilder resolveEnrichMapping(final EnrichPolicy policy) {
145147
.field("doc_values", false)
146148
.endObject()
147149
.endObject()
150+
.startObject("_meta")
151+
.field(ENRICH_KEY_FIELD_NAME, policy.getEnrichKey())
152+
.endObject()
148153
.endObject()
149154
.endObject();
150155

@@ -156,7 +161,7 @@ private XContentBuilder resolveEnrichMapping(final EnrichPolicy policy) {
156161

157162
private void prepareAndCreateEnrichIndex() {
158163
long nowTimestamp = nowSupplier.getAsLong();
159-
String enrichIndexName = policy.getBaseName(policyName) + "-" + nowTimestamp;
164+
String enrichIndexName = EnrichPolicy.getBaseName(policyName) + "-" + nowTimestamp;
160165
Settings enrichIndexSettings = Settings.builder()
161166
.put("index.auto_expand_replicas", "0-all")
162167
.build();
@@ -231,7 +236,7 @@ public void onFailure(Exception e) {
231236
}
232237

233238
private void updateEnrichPolicyAlias(final String destinationIndexName) {
234-
String enrichIndexBase = policy.getBaseName(policyName);
239+
String enrichIndexBase = EnrichPolicy.getBaseName(policyName);
235240
logger.debug("Policy [{}]: Promoting new enrich index [{}] to alias [{}]", policyName, destinationIndexName, enrichIndexBase);
236241
GetAliasesRequest aliasRequest = new GetAliasesRequest(enrichIndexBase);
237242
String[] concreteIndices = indexNameExpressionResolver.concreteIndexNames(clusterService.state(), aliasRequest);

x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/EnrichProcessorFactory.java

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,8 @@
66
package org.elasticsearch.xpack.enrich;
77

88
import org.elasticsearch.cluster.ClusterState;
9+
import org.elasticsearch.cluster.metadata.IndexMetaData;
10+
import org.elasticsearch.common.collect.Tuple;
911
import org.elasticsearch.index.engine.Engine;
1012
import org.elasticsearch.ingest.ConfigurationUtils;
1113
import org.elasticsearch.ingest.Processor;
@@ -22,10 +24,10 @@ final class EnrichProcessorFactory implements Processor.Factory {
2224
static final String TYPE = "enrich";
2325

2426
private final Function<String, EnrichPolicy> policyLookup;
25-
private final Function<String, Engine.Searcher> searchProvider;
27+
private final Function<String, Tuple<IndexMetaData, Engine.Searcher>> searchProvider;
2628

2729
EnrichProcessorFactory(Supplier<ClusterState> clusterStateSupplier,
28-
Function<String, Engine.Searcher> searchProvider) {
30+
Function<String, Tuple<IndexMetaData, Engine.Searcher>> searchProvider) {
2931
this.policyLookup = policyName -> EnrichStore.getPolicy(policyName, clusterStateSupplier.get());
3032
this.searchProvider = searchProvider;
3133
}
@@ -57,7 +59,7 @@ public Processor create(Map<String, Processor.Factory> processorFactories, Strin
5759

5860
switch (policy.getType()) {
5961
case EnrichPolicy.EXACT_MATCH_TYPE:
60-
return new ExactMatchProcessor(tag, policyLookup, searchProvider, policyName, enrichKey, ignoreMissing, specifications);
62+
return new ExactMatchProcessor(tag, searchProvider, policyName, enrichKey, ignoreMissing, specifications);
6163
default:
6264
throw new IllegalArgumentException("unsupported policy type [" + policy.getType() + "]");
6365
}

x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/ExactMatchProcessor.java

Lines changed: 30 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -11,8 +11,10 @@
1111
import org.apache.lucene.index.Terms;
1212
import org.apache.lucene.index.TermsEnum;
1313
import org.apache.lucene.util.BytesRef;
14+
import org.elasticsearch.cluster.metadata.IndexMetaData;
1415
import org.elasticsearch.common.bytes.BytesArray;
1516
import org.elasticsearch.common.bytes.BytesReference;
17+
import org.elasticsearch.common.collect.Tuple;
1618
import org.elasticsearch.common.xcontent.XContentHelper;
1719
import org.elasticsearch.common.xcontent.XContentType;
1820
import org.elasticsearch.index.engine.Engine;
@@ -29,23 +31,22 @@
2931

3032
final class ExactMatchProcessor extends AbstractProcessor {
3133

32-
private final Function<String, EnrichPolicy> policyLookup;
33-
private final Function<String, Engine.Searcher> searchProvider;
34+
static final String ENRICH_KEY_FIELD_NAME = "enrich_key_field";
35+
36+
private final Function<String, Tuple<IndexMetaData, Engine.Searcher>> searchProvider;
3437

3538
private final String policyName;
3639
private final String enrichKey;
3740
private final boolean ignoreMissing;
3841
private final List<EnrichSpecification> specifications;
3942

4043
ExactMatchProcessor(String tag,
41-
Function<String, EnrichPolicy> policyLookup,
42-
Function<String, Engine.Searcher> searchProvider,
44+
Function<String, Tuple<IndexMetaData, Engine.Searcher>> searchProvider,
4345
String policyName,
4446
String enrichKey,
4547
boolean ignoreMissing,
4648
List<EnrichSpecification> specifications) {
4749
super(tag);
48-
this.policyLookup = policyLookup;
4950
this.searchProvider = searchProvider;
5051
this.policyName = policyName;
5152
this.enrichKey = enrichKey;
@@ -55,28 +56,26 @@ final class ExactMatchProcessor extends AbstractProcessor {
5556

5657
@Override
5758
public IngestDocument execute(IngestDocument ingestDocument) throws Exception {
58-
final EnrichPolicy policy = policyLookup.apply(policyName);
59-
if (policy == null) {
60-
throw new IllegalArgumentException("policy [" + policyName + "] does not exists");
61-
}
62-
6359
final String value = ingestDocument.getFieldValue(enrichKey, String.class, ignoreMissing);
6460
if (value == null) {
6561
return ingestDocument;
6662
}
6763

6864
// TODO: re-use the engine searcher between enriching documents from the same write request
69-
try (Engine.Searcher engineSearcher = searchProvider.apply(policy.getBaseName(policyName))) {
65+
Tuple<IndexMetaData, Engine.Searcher> tuple = searchProvider.apply(EnrichPolicy.getBaseName(policyName));
66+
String enrichKeyField = getEnrichKeyField(tuple.v1());
67+
68+
try (Engine.Searcher engineSearcher = tuple.v2()) {
7069
if (engineSearcher.getDirectoryReader().leaves().size() == 0) {
7170
return ingestDocument;
7271
} else if (engineSearcher.getDirectoryReader().leaves().size() != 1) {
7372
throw new IllegalStateException("enrich index must have exactly a single segment");
7473
}
7574

7675
final LeafReader leafReader = engineSearcher.getDirectoryReader().leaves().get(0).reader();
77-
final Terms terms = leafReader.terms(policy.getEnrichKey());
76+
final Terms terms = leafReader.terms(enrichKeyField);
7877
if (terms == null) {
79-
throw new IllegalStateException("enrich key field [" + policy.getEnrichKey() + "] does not exist");
78+
throw new IllegalStateException("enrich key field does not exist");
8079
}
8180

8281
final TermsEnum tenum = terms.iterator();
@@ -124,4 +123,22 @@ boolean isIgnoreMissing() {
124123
List<EnrichSpecification> getSpecifications() {
125124
return specifications;
126125
}
126+
127+
private static String getEnrichKeyField(IndexMetaData imd) {
128+
if (imd == null) {
129+
throw new IllegalStateException("enrich index is missing");
130+
}
131+
132+
Map<String, Object> mappingSource = imd.mapping().getSourceAsMap();
133+
Map<?, ?> meta = (Map<?, ?>) mappingSource.get("_meta");
134+
if (meta == null) {
135+
throw new IllegalStateException("_meta field is missing in enrich index");
136+
}
137+
138+
String fieldName = (String) meta.get(ENRICH_KEY_FIELD_NAME);
139+
if (fieldName == null) {
140+
throw new IllegalStateException("enrich key fieldname missing");
141+
}
142+
return fieldName;
143+
}
127144
}

0 commit comments

Comments
 (0)