Skip to content

Commit 71bcfbf

Browse files
committed
Replace required pipeline with final pipeline (#49470)
This commit enhances the required pipeline functionality by changing it so that default/request pipelines can also be executed, but the required pipeline is always executed last. This gives users the flexibility to execute their own indexing pipelines, but also ensure that any required pipelines are also executed. Since such pipelines are executed last, we change the name of required pipelines to final pipelines.
1 parent 1431c2b commit 71bcfbf

File tree

13 files changed

+518
-372
lines changed

13 files changed

+518
-372
lines changed

docs/reference/index-modules.asciidoc

+6-6
Original file line numberDiff line numberDiff line change
@@ -244,12 +244,12 @@ specific index module:
244244
overridden using the `pipeline` parameter. The special pipeline name `_none` indicates
245245
no ingest pipeline should be run.
246246

247-
`index.required_pipeline`::
248-
The required <<ingest,ingest node>> pipeline for this index. Index requests
249-
will fail if the required pipeline is set and the pipeline does not exist.
250-
The required pipeline can not be overridden with the `pipeline` parameter. A
251-
default pipeline and a required pipeline can not both be set. The special
252-
pipeline name `_none` indicates no ingest pipeline will run.
247+
`index.final_pipeline`::
248+
The final <<ingest,ingest node>> pipeline for this index. Index requests
249+
will fail if the final pipeline is set and the pipeline does not exist.
250+
The final pipeline always runs after the request pipeline (if specified) and
251+
the default pipeline (if it exists). The special pipeline name `_none`
252+
indicates no ingest pipeline will run.
253253

254254
[float]
255255
=== Settings in other index modules

docs/reference/ingest.asciidoc

+5-2
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33

44
[partintro]
55
--
6-
Use an ingest node to pre-process documents before the actual document indexing happens.
6+
Use an ingest node to pre-process documents before the actual document indexing happens.
77
The ingest node intercepts bulk and index requests, it applies transformations, and it then
88
passes the documents back to the index or bulk APIs.
99

@@ -23,7 +23,7 @@ another processor that renames a field. The <<cluster-state,cluster state>> then
2323
the configured pipelines.
2424

2525
To use a pipeline, simply specify the `pipeline` parameter on an index or bulk request. This
26-
way, the ingest node knows which pipeline to use.
26+
way, the ingest node knows which pipeline to use.
2727

2828
For example:
2929
Create a pipeline
@@ -79,6 +79,9 @@ Response:
7979
An index may also declare a <<dynamic-index-settings,default pipeline>> that will be used in the
8080
absence of the `pipeline` parameter.
8181

82+
Finally, an index may also declare a <<dynamic-index-settings,final pipeline>>
83+
that will be executed after any request or default pipeline (if any).
84+
8285
See <<ingest-apis,Ingest APIs>> for more information about creating, adding, and deleting pipelines.
8386

8487
--

modules/ingest-common/src/test/resources/rest-api-spec/test/ingest/240_required_pipeline.yml

+8-17
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ teardown:
66
ignore: 404
77

88
---
9-
"Test index with required pipeline":
9+
"Test index with final pipeline":
1010
- do:
1111
ingest.put_pipeline:
1212
id: "my_pipeline"
@@ -23,14 +23,14 @@ teardown:
2323
]
2424
}
2525
- match: { acknowledged: true }
26-
# required pipeline via index
26+
# final pipeline via index
2727
- do:
2828
indices.create:
2929
index: test
3030
body:
3131
settings:
3232
index:
33-
required_pipeline: "my_pipeline"
33+
final_pipeline: "my_pipeline"
3434
aliases:
3535
test_alias: {}
3636

@@ -46,7 +46,7 @@ teardown:
4646
id: 1
4747
- match: { _source.bytes_source_field: "1kb" }
4848
- match: { _source.bytes_target_field: 1024 }
49-
# required pipeline via alias
49+
# final pipeline via alias
5050
- do:
5151
index:
5252
index: test_alias
@@ -59,7 +59,7 @@ teardown:
5959
id: 2
6060
- match: { _source.bytes_source_field: "1kb" }
6161
- match: { _source.bytes_target_field: 1024 }
62-
# required pipeline via upsert
62+
# final pipeline via upsert
6363
- do:
6464
update:
6565
index: test
@@ -75,7 +75,7 @@ teardown:
7575
id: 3
7676
- match: { _source.bytes_source_field: "1kb" }
7777
- match: { _source.bytes_target_field: 1024 }
78-
# required pipeline via scripted upsert
78+
# final pipeline via scripted upsert
7979
- do:
8080
update:
8181
index: test
@@ -92,7 +92,7 @@ teardown:
9292
id: 4
9393
- match: { _source.bytes_source_field: "1kb" }
9494
- match: { _source.bytes_target_field: 1024 }
95-
# required pipeline via doc_as_upsert
95+
# final pipeline via doc_as_upsert
9696
- do:
9797
update:
9898
index: test
@@ -106,7 +106,7 @@ teardown:
106106
id: 5
107107
- match: { _source.bytes_source_field: "1kb" }
108108
- match: { _source.bytes_target_field: 1024 }
109-
# required pipeline via bulk upsert
109+
# final pipeline via bulk upsert
110110
# note - bulk scripted upsert's execute the pipeline before the script, so any data referenced by the pipeline
111111
# needs to be in the upsert, not the script
112112
- do:
@@ -164,12 +164,3 @@ teardown:
164164
- match: { docs.5._source.bytes_source_field: "3kb" }
165165
- match: { docs.5._source.bytes_target_field: 3072 }
166166
- match: { docs.5._source.ran_script: true }
167-
168-
# bad request, request pipeline can not be specified
169-
- do:
170-
catch: /illegal_argument_exception.*request pipeline \[pipeline\] can not override required pipeline \[my_pipeline\]/
171-
index:
172-
index: test
173-
id: 9
174-
pipeline: "pipeline"
175-
body: {bytes_source_field: "1kb"}

server/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java

+34-57
Original file line numberDiff line numberDiff line change
@@ -79,7 +79,6 @@
7979
import java.util.HashSet;
8080
import java.util.Iterator;
8181
import java.util.List;
82-
import java.util.Locale;
8382
import java.util.Map;
8483
import java.util.Objects;
8584
import java.util.Set;
@@ -167,7 +166,7 @@ protected void doExecute(Task task, BulkRequest bulkRequest, ActionListener<Bulk
167166
IndexRequest indexRequest = getIndexWriteRequest(actionRequest);
168167
if (indexRequest != null) {
169168
// Each index request needs to be evaluated, because this method also modifies the IndexRequest
170-
boolean indexRequestHasPipeline = resolveRequiredOrDefaultPipeline(actionRequest, indexRequest, metaData);
169+
boolean indexRequestHasPipeline = resolvePipelines(actionRequest, indexRequest, metaData);
171170
hasIndexRequestsWithPipelines |= indexRequestHasPipeline;
172171
}
173172

@@ -273,16 +272,14 @@ public void onFailure(Exception e) {
273272
}
274273
}
275274

276-
static boolean resolveRequiredOrDefaultPipeline(DocWriteRequest<?> originalRequest,
277-
IndexRequest indexRequest,
278-
MetaData metaData) {
279-
275+
static boolean resolvePipelines(final DocWriteRequest<?> originalRequest, final IndexRequest indexRequest, final MetaData metaData) {
280276
if (indexRequest.isPipelineResolved() == false) {
281277
final String requestPipeline = indexRequest.getPipeline();
282278
indexRequest.setPipeline(IngestService.NOOP_PIPELINE_NAME);
283-
boolean requestCanOverridePipeline = true;
284-
String requiredPipeline = null;
285-
// start to look for default or required pipelines via settings found in the index meta data
279+
indexRequest.setFinalPipeline(IngestService.NOOP_PIPELINE_NAME);
280+
String defaultPipeline = null;
281+
String finalPipeline = null;
282+
// start to look for default or final pipelines via settings found in the index meta data
286283
IndexMetaData indexMetaData = metaData.indices().get(originalRequest.index());
287284
// check the alias for the index request (this is how normal index requests are modeled)
288285
if (indexMetaData == null && indexRequest.index() != null) {
@@ -302,64 +299,42 @@ static boolean resolveRequiredOrDefaultPipeline(DocWriteRequest<?> originalReque
302299
}
303300
if (indexMetaData != null) {
304301
final Settings indexSettings = indexMetaData.getSettings();
305-
if (IndexSettings.REQUIRED_PIPELINE.exists(indexSettings)) {
306-
// find the required pipeline if one is defined from an existing index
307-
requiredPipeline = IndexSettings.REQUIRED_PIPELINE.get(indexSettings);
308-
assert IndexSettings.DEFAULT_PIPELINE.get(indexSettings).equals(IngestService.NOOP_PIPELINE_NAME) :
309-
IndexSettings.DEFAULT_PIPELINE.get(indexSettings);
310-
indexRequest.setPipeline(requiredPipeline);
311-
requestCanOverridePipeline = false;
312-
} else {
313-
// find the default pipeline if one is defined from an existing index
314-
String defaultPipeline = IndexSettings.DEFAULT_PIPELINE.get(indexSettings);
302+
if (IndexSettings.DEFAULT_PIPELINE.exists(indexSettings)) {
303+
// find the default pipeline if one is defined from an existing index setting
304+
defaultPipeline = IndexSettings.DEFAULT_PIPELINE.get(indexSettings);
315305
indexRequest.setPipeline(defaultPipeline);
316306
}
307+
if (IndexSettings.FINAL_PIPELINE.exists(indexSettings)) {
308+
// find the final pipeline if one is defined from an existing index setting
309+
finalPipeline = IndexSettings.FINAL_PIPELINE.get(indexSettings);
310+
indexRequest.setFinalPipeline(finalPipeline);
311+
}
317312
} else if (indexRequest.index() != null) {
318-
// the index does not exist yet (and is valid request), so match index templates to look for a default pipeline
313+
// the index does not exist yet (and this is a valid request), so match index templates to look for pipelines
319314
List<IndexTemplateMetaData> templates = MetaDataIndexTemplateService.findTemplates(metaData, indexRequest.index());
320315
assert (templates != null);
321-
// order of templates are highest order first, we have to iterate through them all though
322-
String defaultPipeline = null;
323-
for (IndexTemplateMetaData template : templates) {
316+
// order of templates are highest order first
317+
for (final IndexTemplateMetaData template : templates) {
324318
final Settings settings = template.settings();
325-
if (requiredPipeline == null && IndexSettings.REQUIRED_PIPELINE.exists(settings)) {
326-
requiredPipeline = IndexSettings.REQUIRED_PIPELINE.get(settings);
327-
requestCanOverridePipeline = false;
328-
// we can not break in case a lower-order template has a default pipeline that we need to reject
329-
} else if (defaultPipeline == null && IndexSettings.DEFAULT_PIPELINE.exists(settings)) {
319+
if (defaultPipeline == null && IndexSettings.DEFAULT_PIPELINE.exists(settings)) {
330320
defaultPipeline = IndexSettings.DEFAULT_PIPELINE.get(settings);
331-
// we can not break in case a lower-order template has a required pipeline that we need to reject
321+
// we can not break in case a lower-order template has a final pipeline that we need to collect
322+
}
323+
if (finalPipeline == null && IndexSettings.FINAL_PIPELINE.exists(settings)) {
324+
finalPipeline = IndexSettings.FINAL_PIPELINE.get(settings);
325+
// we can not break in case a lower-order template has a default pipeline that we need to collect
326+
}
327+
if (defaultPipeline != null && finalPipeline != null) {
328+
// we can break if we have already collected a default and final pipeline
329+
break;
332330
}
333331
}
334-
if (requiredPipeline != null && defaultPipeline != null) {
335-
// we can not have picked up a required and a default pipeline from applying templates
336-
final String message = String.format(
337-
Locale.ROOT,
338-
"required pipeline [%s] and default pipeline [%s] can not both be set",
339-
requiredPipeline,
340-
defaultPipeline);
341-
throw new IllegalArgumentException(message);
342-
}
343-
final String pipeline;
344-
if (requiredPipeline != null) {
345-
pipeline = requiredPipeline;
346-
} else {
347-
pipeline = defaultPipeline != null ? defaultPipeline : IngestService.NOOP_PIPELINE_NAME;
348-
}
349-
indexRequest.setPipeline(pipeline);
332+
indexRequest.setPipeline(defaultPipeline != null ? defaultPipeline : IngestService.NOOP_PIPELINE_NAME);
333+
indexRequest.setFinalPipeline(finalPipeline != null ? finalPipeline : IngestService.NOOP_PIPELINE_NAME);
350334
}
351335

352336
if (requestPipeline != null) {
353-
if (requestCanOverridePipeline == false) {
354-
final String message = String.format(
355-
Locale.ROOT,
356-
"request pipeline [%s] can not override required pipeline [%s]",
357-
requestPipeline,
358-
requiredPipeline);
359-
throw new IllegalArgumentException(message);
360-
} else {
361-
indexRequest.setPipeline(requestPipeline);
362-
}
337+
indexRequest.setPipeline(requestPipeline);
363338
}
364339

365340
/*
@@ -375,8 +350,10 @@ static boolean resolveRequiredOrDefaultPipeline(DocWriteRequest<?> originalReque
375350
indexRequest.isPipelineResolved(true);
376351
}
377352

378-
// Return whether this index request has a pipeline
379-
return IngestService.NOOP_PIPELINE_NAME.equals(indexRequest.getPipeline()) == false;
353+
354+
// return whether this index request has a pipeline
355+
return IngestService.NOOP_PIPELINE_NAME.equals(indexRequest.getPipeline()) == false
356+
|| IngestService.NOOP_PIPELINE_NAME.equals(indexRequest.getFinalPipeline()) == false;
380357
}
381358

382359
boolean needToCheck() {

server/src/main/java/org/elasticsearch/action/index/IndexRequest.java

+30
Original file line numberDiff line numberDiff line change
@@ -101,6 +101,7 @@ public class IndexRequest extends ReplicatedWriteRequest<IndexRequest> implement
101101
private XContentType contentType;
102102

103103
private String pipeline;
104+
private String finalPipeline;
104105

105106
private boolean isPipelineResolved;
106107

@@ -133,6 +134,9 @@ public IndexRequest(StreamInput in) throws IOException {
133134
version = in.readLong();
134135
versionType = VersionType.fromValue(in.readByte());
135136
pipeline = in.readOptionalString();
137+
if (in.getVersion().onOrAfter(Version.V_7_6_0)) {
138+
finalPipeline = in.readOptionalString();
139+
}
136140
if (in.getVersion().onOrAfter(Version.V_7_5_0)) {
137141
isPipelineResolved = in.readBoolean();
138142
}
@@ -246,6 +250,9 @@ public ActionRequestValidationException validate() {
246250
validationException = addValidationError("pipeline cannot be an empty string", validationException);
247251
}
248252

253+
if (finalPipeline != null && finalPipeline.isEmpty()) {
254+
validationException = addValidationError("final pipeline cannot be an empty string", validationException);
255+
}
249256

250257
return validationException;
251258
}
@@ -350,6 +357,26 @@ public String getPipeline() {
350357
return this.pipeline;
351358
}
352359

360+
/**
361+
* Sets the final ingest pipeline to be executed before indexing the document.
362+
*
363+
* @param finalPipeline the name of the final pipeline
364+
* @return this index request
365+
*/
366+
public IndexRequest setFinalPipeline(final String finalPipeline) {
367+
this.finalPipeline = finalPipeline;
368+
return this;
369+
}
370+
371+
/**
372+
* Returns the final ingest pipeline to be executed before indexing the document.
373+
*
374+
* @return the name of the final pipeline
375+
*/
376+
public String getFinalPipeline() {
377+
return this.finalPipeline;
378+
}
379+
353380
/**
354381
* Sets if the pipeline for this request has been resolved by the coordinating node.
355382
*
@@ -686,6 +713,9 @@ public void writeTo(StreamOutput out) throws IOException {
686713
out.writeLong(version);
687714
out.writeByte(versionType.getValue());
688715
out.writeOptionalString(pipeline);
716+
if (out.getVersion().onOrAfter(Version.V_7_6_0)) {
717+
out.writeOptionalString(finalPipeline);
718+
}
689719
if (out.getVersion().onOrAfter(Version.V_7_5_0)) {
690720
out.writeBoolean(isPipelineResolved);
691721
}

server/src/main/java/org/elasticsearch/common/settings/IndexScopedSettings.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -166,7 +166,7 @@ public final class IndexScopedSettings extends AbstractScopedSettings {
166166
EngineConfig.INDEX_OPTIMIZE_AUTO_GENERATED_IDS,
167167
IndexMetaData.SETTING_WAIT_FOR_ACTIVE_SHARDS,
168168
IndexSettings.DEFAULT_PIPELINE,
169-
IndexSettings.REQUIRED_PIPELINE,
169+
IndexSettings.FINAL_PIPELINE,
170170
MetaDataIndexStateService.VERIFIED_BEFORE_CLOSE_SETTING,
171171

172172
// validate that built-in similarities don't get redefined

0 commit comments

Comments
 (0)