Skip to content

Commit a66c0dc

Browse files
committed
Add pipeline to ensure unique Enrich index documents (#46348)
Adds a pipeline that removes ids and routing from documents before indexing them into enrich indices. Enrich documents may come from multiple indices, and thus have id collisions on them. This pipeline ensures that documents with colliding id fields do not clobber one another during the reindex operation while executing an enrich policy.
1 parent 0e1b775 commit a66c0dc

File tree

6 files changed

+350
-11
lines changed

6 files changed

+350
-11
lines changed
Lines changed: 94 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,94 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License;
4+
* you may not use this file except in compliance with the Elastic License.
5+
*/
6+
package org.elasticsearch.xpack.enrich;
7+
8+
import java.io.IOException;
9+
import java.io.UncheckedIOException;
10+
11+
import org.elasticsearch.Version;
12+
import org.elasticsearch.action.ActionListener;
13+
import org.elasticsearch.action.ingest.PutPipelineRequest;
14+
import org.elasticsearch.action.support.master.AcknowledgedResponse;
15+
import org.elasticsearch.client.Client;
16+
import org.elasticsearch.cluster.ClusterState;
17+
import org.elasticsearch.common.bytes.BytesReference;
18+
import org.elasticsearch.common.xcontent.XContentBuilder;
19+
import org.elasticsearch.common.xcontent.XContentType;
20+
import org.elasticsearch.ingest.IngestMetadata;
21+
import org.elasticsearch.ingest.PipelineConfiguration;
22+
23+
/**
24+
* Manages the definitions and lifecycle of the ingest pipeline used by the reindex operation within the Enrich Policy execution.
25+
*/
26+
public class EnrichPolicyReindexPipeline {
27+
28+
/**
29+
* The current version of the pipeline definition. Used in the pipeline's name to differentiate from breaking changes
30+
* (separate from product version).
31+
*/
32+
static final String CURRENT_PIPELINE_VERSION_NAME = "7";
33+
34+
/**
35+
* The last version of the distribution that updated the pipelines definition.
36+
* TODO: This should be the version of ES that Enrich first ships in, which likely doesn't exist yet.
37+
*/
38+
static final int ENRICH_PIPELINE_LAST_UPDATED_VERSION = Version.V_7_4_0.id;
39+
40+
static String pipelineName() {
41+
return "enrich-policy-reindex-" + CURRENT_PIPELINE_VERSION_NAME;
42+
}
43+
44+
/**
45+
* Checks if the current version of the pipeline definition is installed in the cluster
46+
* @param clusterState The cluster state to check
47+
* @return true if a pipeline exists that is compatible with this version of Enrich, false otherwise
48+
*/
49+
static boolean exists(ClusterState clusterState) {
50+
final IngestMetadata ingestMetadata = clusterState.getMetaData().custom(IngestMetadata.TYPE);
51+
// we ensure that we both have the pipeline and its version represents the current (or later) version
52+
if (ingestMetadata != null) {
53+
final PipelineConfiguration pipeline = ingestMetadata.getPipelines().get(pipelineName());
54+
if (pipeline != null) {
55+
Object version = pipeline.getConfigAsMap().get("version");
56+
return version instanceof Number && ((Number) version).intValue() >= ENRICH_PIPELINE_LAST_UPDATED_VERSION;
57+
}
58+
}
59+
return false;
60+
}
61+
62+
/**
63+
* Creates a pipeline with the current version's pipeline definition
64+
* @param client Client used to execute put pipeline
65+
* @param listener Callback used after pipeline has been created
66+
*/
67+
public static void create(Client client, ActionListener<AcknowledgedResponse> listener) {
68+
final BytesReference pipeline = BytesReference.bytes(currentEnrichPipelineDefinition(XContentType.JSON));
69+
final PutPipelineRequest request = new PutPipelineRequest(pipelineName(), pipeline, XContentType.JSON);
70+
client.admin().cluster().putPipeline(request, listener);
71+
}
72+
73+
private static XContentBuilder currentEnrichPipelineDefinition(XContentType xContentType) {
74+
try {
75+
return XContentBuilder.builder(xContentType.xContent())
76+
.startObject()
77+
.field("description", "This pipeline sanitizes documents that will be stored in enrich indices for ingest lookup " +
78+
"purposes. It is an internal pipeline and should not be modified.")
79+
.field("version", ENRICH_PIPELINE_LAST_UPDATED_VERSION)
80+
.startArray("processors")
81+
.startObject()
82+
// remove the id from the document so that documents from multiple indices will always be unique.
83+
.startObject("remove")
84+
.field("field", "_id")
85+
.endObject()
86+
.endObject()
87+
.endArray()
88+
.endObject();
89+
} catch (final IOException e) {
90+
throw new UncheckedIOException("Failed to create pipeline for enrich document sanitization", e);
91+
}
92+
}
93+
94+
}

x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/EnrichPolicyRunner.java

Lines changed: 22 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -250,7 +250,7 @@ private void prepareAndCreateEnrichIndex() {
250250
client.admin().indices().create(createEnrichIndexRequest, new ActionListener<CreateIndexResponse>() {
251251
@Override
252252
public void onResponse(CreateIndexResponse createIndexResponse) {
253-
transferDataToEnrichIndex(enrichIndexName);
253+
prepareReindexOperation(enrichIndexName);
254254
}
255255

256256
@Override
@@ -260,6 +260,25 @@ public void onFailure(Exception e) {
260260
});
261261
}
262262

263+
private void prepareReindexOperation(final String destinationIndexName) {
264+
// Check to make sure that the enrich pipeline exists, and create it if it is missing.
265+
if (EnrichPolicyReindexPipeline.exists(clusterService.state()) == false) {
266+
EnrichPolicyReindexPipeline.create(client, new ActionListener<AcknowledgedResponse>() {
267+
@Override
268+
public void onResponse(AcknowledgedResponse acknowledgedResponse) {
269+
transferDataToEnrichIndex(destinationIndexName);
270+
}
271+
272+
@Override
273+
public void onFailure(Exception e) {
274+
listener.onFailure(e);
275+
}
276+
});
277+
} else {
278+
transferDataToEnrichIndex(destinationIndexName);
279+
}
280+
}
281+
263282
private void transferDataToEnrichIndex(final String destinationIndexName) {
264283
logger.debug("Policy [{}]: Transferring source data to new enrich index [{}]", policyName, destinationIndexName);
265284
// Filter down the source fields to just the ones required by the policy
@@ -277,6 +296,8 @@ private void transferDataToEnrichIndex(final String destinationIndexName) {
277296
.setSourceIndices(policy.getIndices().toArray(new String[0]));
278297
reindexRequest.getSearchRequest().source(searchSourceBuilder);
279298
reindexRequest.getDestination().source(new BytesArray(new byte[0]), XContentType.SMILE);
299+
reindexRequest.getDestination().routing("discard");
300+
reindexRequest.getDestination().setPipeline(EnrichPolicyReindexPipeline.pipelineName());
280301
client.execute(ReindexAction.INSTANCE, reindexRequest, new ActionListener<BulkByScrollResponse>() {
281302
@Override
282303
public void onResponse(BulkByScrollResponse bulkByScrollResponse) {

x-pack/plugin/enrich/src/test/java/org/elasticsearch/xpack/enrich/BasicEnrichTests.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
import org.elasticsearch.common.bytes.BytesArray;
1818
import org.elasticsearch.common.xcontent.XContentType;
1919
import org.elasticsearch.index.reindex.ReindexPlugin;
20+
import org.elasticsearch.ingest.common.IngestCommonPlugin;
2021
import org.elasticsearch.plugins.Plugin;
2122
import org.elasticsearch.test.ESSingleNodeTestCase;
2223
import org.elasticsearch.xpack.core.enrich.EnrichPolicy;
@@ -46,7 +47,7 @@ public class BasicEnrichTests extends ESSingleNodeTestCase {
4647

4748
@Override
4849
protected Collection<Class<? extends Plugin>> getPlugins() {
49-
return Arrays.asList(LocalStateEnrich.class, ReindexPlugin.class);
50+
return Arrays.asList(LocalStateEnrich.class, ReindexPlugin.class, IngestCommonPlugin.class);
5051
}
5152

5253
@Override

x-pack/plugin/enrich/src/test/java/org/elasticsearch/xpack/enrich/EnrichMultiNodeIT.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import org.elasticsearch.common.settings.Settings;
2222
import org.elasticsearch.common.xcontent.XContentType;
2323
import org.elasticsearch.index.reindex.ReindexPlugin;
24+
import org.elasticsearch.ingest.common.IngestCommonPlugin;
2425
import org.elasticsearch.node.Node;
2526
import org.elasticsearch.plugins.Plugin;
2627
import org.elasticsearch.test.ESIntegTestCase;
@@ -59,7 +60,7 @@ public class EnrichMultiNodeIT extends ESIntegTestCase {
5960

6061
@Override
6162
protected Collection<Class<? extends Plugin>> nodePlugins() {
62-
return Arrays.asList(LocalStateEnrich.class, ReindexPlugin.class);
63+
return Arrays.asList(LocalStateEnrich.class, ReindexPlugin.class, IngestCommonPlugin.class);
6364
}
6465

6566
@Override

0 commit comments

Comments
 (0)