Skip to content

Keep track of the enrich key field in the enrich index. #42022

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import org.elasticsearch.cluster.ClusterChangedEvent;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterStateApplier;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.node.DiscoveryNode;
Expand Down Expand Up @@ -127,7 +128,8 @@ public IngestService(ClusterService clusterService, ThreadPool threadPool,
}

IndexShard indexShard = indexService.getShard(0);
return indexShard.acquireSearcher("ingest");
IndexMetaData imd = state.metaData().index(index);
return new Tuple<>(imd, indexShard.acquireSearcher("ingest"));
}
)
);
Expand Down
8 changes: 6 additions & 2 deletions server/src/main/java/org/elasticsearch/ingest/Processor.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@

package org.elasticsearch.ingest;

import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.util.concurrent.ThreadContext;
import org.elasticsearch.env.Environment;
import org.elasticsearch.index.analysis.AnalysisRegistry;
Expand Down Expand Up @@ -114,14 +116,16 @@ class Parameters {

/**
* Provides access to an engine searcher of a locally allocated index specified for the provided index.
* The input of this function is an index expression and this function returns the {@link IndexMetaData}
* of the resolved locally allocated index and {@link Engine.Searcher} instance for the resolved index.
*
* The locally allocated index must be have a single primary shard.
*/
public final Function<String, Engine.Searcher> localShardSearcher;
public final Function<String, Tuple<IndexMetaData, Engine.Searcher>> localShardSearcher;

public Parameters(Environment env, ScriptService scriptService, AnalysisRegistry analysisRegistry, ThreadContext threadContext,
LongSupplier relativeTimeSupplier, BiFunction<Long, Runnable, Scheduler.ScheduledCancellable> scheduler,
IngestService ingestService, Function<String, Engine.Searcher> localShardSearcher) {
IngestService ingestService, Function<String, Tuple<IndexMetaData, Engine.Searcher>> localShardSearcher) {
this.env = env;
this.scriptService = scriptService;
this.threadContext = threadContext;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,9 @@
import org.elasticsearch.action.get.GetRequest;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.ingest.PutPipelineRequest;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.index.IndexModule;
Expand Down Expand Up @@ -180,17 +182,17 @@ static class TestProcessor extends AbstractProcessor {

static final String NAME = "test_processor";

private final Function<String, Engine.Searcher> localShardSearcher;
private final Function<String, Tuple<IndexMetaData, Engine.Searcher>> localShardSearcher;

TestProcessor(String tag, Function<String, Engine.Searcher> localShardSearcher) {
TestProcessor(String tag, Function<String, Tuple<IndexMetaData, Engine.Searcher>> localShardSearcher) {
super(tag);
this.localShardSearcher = localShardSearcher;
}

@Override
public IngestDocument execute(IngestDocument ingestDocument) throws Exception {
String indexExpression = "reference-index";
try (Engine.Searcher engineSearcher = localShardSearcher.apply(indexExpression)) {
try (Engine.Searcher engineSearcher = localShardSearcher.apply(indexExpression).v2()) {
// Ensure that search wrapper has been invoked by checking the directory instance type:
if ((engineSearcher.getDirectoryReader() instanceof TestDirectyReader) == false) {
// asserting or throwing a AssertionError makes this test hang:
Expand All @@ -210,9 +212,9 @@ public String getType() {

static class Factory implements Processor.Factory {

private final Function<String, Engine.Searcher> localShardSearcher;
private final Function<String, Tuple<IndexMetaData, Engine.Searcher>> localShardSearcher;

Factory(Function<String, Engine.Searcher> localShardSearcher) {
Factory(Function<String, Tuple<IndexMetaData, Engine.Searcher>> localShardSearcher) {
this.localShardSearcher = localShardSearcher;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ public String getSchedule() {
return schedule;
}

public String getBaseName(String policyName) {
public static String getBaseName(String policyName) {
return ENRICH_INDEX_NAME_BASE + policyName;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import org.apache.http.util.EntityUtils;
import org.elasticsearch.client.Request;
import org.elasticsearch.client.Response;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentHelper;
import org.elasticsearch.common.xcontent.XContentType;
Expand Down Expand Up @@ -45,6 +46,10 @@ public void testBasicFlow() throws Exception {
"\"enrich_values\": [\"globalRank\", \"tldRank\", \"tld\"], \"schedule\": \"0 5 * * *\"}");
assertOK(client().performRequest(putPolicyRequest));

// create index (remove when execute policy api has been added)
String mapping = "\"_meta\": {\"enrich_key_field\": \"host\"}";
createIndex(".enrich-my_policy", Settings.EMPTY, mapping);

// Add a single enrich document for now and then refresh:
Request indexRequest = new Request("PUT", "/.enrich-my_policy/_doc/elastic.co");
XContentBuilder document = XContentBuilder.builder(XContentType.SMILE.xContent());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.xpack.core.enrich.EnrichPolicy;

import static org.elasticsearch.xpack.enrich.ExactMatchProcessor.ENRICH_KEY_FIELD_NAME;

public class EnrichPolicyRunner implements Runnable {

private static final Logger logger = LogManager.getLogger(EnrichPolicyRunner.class);
Expand Down Expand Up @@ -145,6 +147,9 @@ private XContentBuilder resolveEnrichMapping(final EnrichPolicy policy) {
.field("doc_values", false)
.endObject()
.endObject()
.startObject("_meta")
.field(ENRICH_KEY_FIELD_NAME, policy.getEnrichKey())
.endObject()
.endObject()
.endObject();

Expand All @@ -156,7 +161,7 @@ private XContentBuilder resolveEnrichMapping(final EnrichPolicy policy) {

private void prepareAndCreateEnrichIndex() {
long nowTimestamp = nowSupplier.getAsLong();
String enrichIndexName = policy.getBaseName(policyName) + "-" + nowTimestamp;
String enrichIndexName = EnrichPolicy.getBaseName(policyName) + "-" + nowTimestamp;
Settings enrichIndexSettings = Settings.builder()
.put("index.auto_expand_replicas", "0-all")
.build();
Expand Down Expand Up @@ -231,7 +236,7 @@ public void onFailure(Exception e) {
}

private void updateEnrichPolicyAlias(final String destinationIndexName) {
String enrichIndexBase = policy.getBaseName(policyName);
String enrichIndexBase = EnrichPolicy.getBaseName(policyName);
logger.debug("Policy [{}]: Promoting new enrich index [{}] to alias [{}]", policyName, destinationIndexName, enrichIndexBase);
GetAliasesRequest aliasRequest = new GetAliasesRequest(enrichIndexBase);
String[] concreteIndices = indexNameExpressionResolver.concreteIndexNames(clusterService.state(), aliasRequest);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@
package org.elasticsearch.xpack.enrich;

import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.ingest.ConfigurationUtils;
import org.elasticsearch.ingest.Processor;
Expand All @@ -22,10 +24,10 @@ final class EnrichProcessorFactory implements Processor.Factory {
static final String TYPE = "enrich";

private final Function<String, EnrichPolicy> policyLookup;
private final Function<String, Engine.Searcher> searchProvider;
private final Function<String, Tuple<IndexMetaData, Engine.Searcher>> searchProvider;

EnrichProcessorFactory(Supplier<ClusterState> clusterStateSupplier,
Function<String, Engine.Searcher> searchProvider) {
Function<String, Tuple<IndexMetaData, Engine.Searcher>> searchProvider) {
this.policyLookup = policyName -> EnrichStore.getPolicy(policyName, clusterStateSupplier.get());
this.searchProvider = searchProvider;
}
Expand Down Expand Up @@ -57,7 +59,7 @@ public Processor create(Map<String, Processor.Factory> processorFactories, Strin

switch (policy.getType()) {
case EnrichPolicy.EXACT_MATCH_TYPE:
return new ExactMatchProcessor(tag, policyLookup, searchProvider, policyName, enrichKey, ignoreMissing, specifications);
return new ExactMatchProcessor(tag, searchProvider, policyName, enrichKey, ignoreMissing, specifications);
default:
throw new IllegalArgumentException("unsupported policy type [" + policy.getType() + "]");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,10 @@
import org.apache.lucene.index.Terms;
import org.apache.lucene.index.TermsEnum;
import org.apache.lucene.util.BytesRef;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.xcontent.XContentHelper;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.index.engine.Engine;
Expand All @@ -29,23 +31,22 @@

final class ExactMatchProcessor extends AbstractProcessor {

private final Function<String, EnrichPolicy> policyLookup;
private final Function<String, Engine.Searcher> searchProvider;
static final String ENRICH_KEY_FIELD_NAME = "enrich_key_field";

private final Function<String, Tuple<IndexMetaData, Engine.Searcher>> searchProvider;

private final String policyName;
private final String enrichKey;
private final boolean ignoreMissing;
private final List<EnrichSpecification> specifications;

ExactMatchProcessor(String tag,
Function<String, EnrichPolicy> policyLookup,
Function<String, Engine.Searcher> searchProvider,
Function<String, Tuple<IndexMetaData, Engine.Searcher>> searchProvider,
String policyName,
String enrichKey,
boolean ignoreMissing,
List<EnrichSpecification> specifications) {
super(tag);
this.policyLookup = policyLookup;
this.searchProvider = searchProvider;
this.policyName = policyName;
this.enrichKey = enrichKey;
Expand All @@ -55,28 +56,26 @@ final class ExactMatchProcessor extends AbstractProcessor {

@Override
public IngestDocument execute(IngestDocument ingestDocument) throws Exception {
final EnrichPolicy policy = policyLookup.apply(policyName);
if (policy == null) {
throw new IllegalArgumentException("policy [" + policyName + "] does not exists");
}

final String value = ingestDocument.getFieldValue(enrichKey, String.class, ignoreMissing);
if (value == null) {
return ingestDocument;
}

// TODO: re-use the engine searcher between enriching documents from the same write request
try (Engine.Searcher engineSearcher = searchProvider.apply(policy.getBaseName(policyName))) {
Tuple<IndexMetaData, Engine.Searcher> tuple = searchProvider.apply(EnrichPolicy.getBaseName(policyName));
String enrichKeyField = getEnrichKeyField(tuple.v1());

try (Engine.Searcher engineSearcher = tuple.v2()) {
if (engineSearcher.getDirectoryReader().leaves().size() == 0) {
return ingestDocument;
} else if (engineSearcher.getDirectoryReader().leaves().size() != 1) {
throw new IllegalStateException("enrich index must have exactly a single segment");
}

final LeafReader leafReader = engineSearcher.getDirectoryReader().leaves().get(0).reader();
final Terms terms = leafReader.terms(policy.getEnrichKey());
final Terms terms = leafReader.terms(enrichKeyField);
if (terms == null) {
throw new IllegalStateException("enrich key field [" + policy.getEnrichKey() + "] does not exist");
throw new IllegalStateException("enrich key field does not exist");
}

final TermsEnum tenum = terms.iterator();
Expand Down Expand Up @@ -124,4 +123,22 @@ boolean isIgnoreMissing() {
List<EnrichSpecification> getSpecifications() {
return specifications;
}

private static String getEnrichKeyField(IndexMetaData imd) {
if (imd == null) {
throw new IllegalStateException("enrich index is missing");
}

Map<String, Object> mappingSource = imd.mapping().getSourceAsMap();
Map<?, ?> meta = (Map<?, ?>) mappingSource.get("_meta");
if (meta == null) {
throw new IllegalStateException("_meta field is missing in enrich index");
}

String fieldName = (String) meta.get(ENRICH_KEY_FIELD_NAME);
if (fieldName == null) {
throw new IllegalStateException("enrich key fieldname missing");
}
return fieldName;
}
}
Loading