Skip to content

Commit da20957

Browse files
authored
Replace required pipeline with final pipeline (#49470)
This commit enhances the required pipeline functionality by changing it so that default/request pipelines can also be executed, but the required pipeline is always executed last. This gives users the flexibility to execute their own indexing pipelines, but also ensure that any required pipelines are also executed. Since such pipelines are executed last, we change the name of required pipelines to final pipelines.
1 parent 3b92572 commit da20957

File tree

13 files changed

+507
-373
lines changed

13 files changed

+507
-373
lines changed

docs/reference/index-modules.asciidoc

+6-6
Original file line numberDiff line numberDiff line change
@@ -244,12 +244,12 @@ specific index module:
244244
overridden using the `pipeline` parameter. The special pipeline name `_none` indicates
245245
no ingest pipeline should be run.
246246

247-
`index.required_pipeline`::
248-
The required <<ingest,ingest node>> pipeline for this index. Index requests
249-
will fail if the required pipeline is set and the pipeline does not exist.
250-
The required pipeline can not be overridden with the `pipeline` parameter. A
251-
default pipeline and a required pipeline can not both be set. The special
252-
pipeline name `_none` indicates no ingest pipeline will run.
247+
`index.final_pipeline`::
248+
The final <<ingest,ingest node>> pipeline for this index. Index requests
249+
will fail if the final pipeline is set and the pipeline does not exist.
250+
The final pipeline always runs after the request pipeline (if specified) and
251+
the default pipeline (if it exists). The special pipeline name `_none`
252+
indicates no ingest pipeline will run.
253253

254254
[float]
255255
=== Settings in other index modules

docs/reference/ingest.asciidoc

+5-2
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33

44
[partintro]
55
--
6-
Use an ingest node to pre-process documents before the actual document indexing happens.
6+
Use an ingest node to pre-process documents before the actual document indexing happens.
77
The ingest node intercepts bulk and index requests, it applies transformations, and it then
88
passes the documents back to the index or bulk APIs.
99

@@ -23,7 +23,7 @@ another processor that renames a field. The <<cluster-state,cluster state>> then
2323
the configured pipelines.
2424

2525
To use a pipeline, simply specify the `pipeline` parameter on an index or bulk request. This
26-
way, the ingest node knows which pipeline to use.
26+
way, the ingest node knows which pipeline to use.
2727

2828
For example:
2929
Create a pipeline
@@ -78,6 +78,9 @@ Response:
7878
An index may also declare a <<dynamic-index-settings,default pipeline>> that will be used in the
7979
absence of the `pipeline` parameter.
8080

81+
Finally, an index may also declare a <<dynamic-index-settings,final pipeline>>
82+
that will be executed after any request or default pipeline (if any).
83+
8184
See <<ingest-apis,Ingest APIs>> for more information about creating, adding, and deleting pipelines.
8285

8386
--

modules/ingest-common/src/test/resources/rest-api-spec/test/ingest/240_required_pipeline.yml

+8-17
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ teardown:
66
ignore: 404
77

88
---
9-
"Test index with required pipeline":
9+
"Test index with final pipeline":
1010
- do:
1111
ingest.put_pipeline:
1212
id: "my_pipeline"
@@ -23,14 +23,14 @@ teardown:
2323
]
2424
}
2525
- match: { acknowledged: true }
26-
# required pipeline via index
26+
# final pipeline via index
2727
- do:
2828
indices.create:
2929
index: test
3030
body:
3131
settings:
3232
index:
33-
required_pipeline: "my_pipeline"
33+
final_pipeline: "my_pipeline"
3434
aliases:
3535
test_alias: {}
3636

@@ -46,7 +46,7 @@ teardown:
4646
id: 1
4747
- match: { _source.bytes_source_field: "1kb" }
4848
- match: { _source.bytes_target_field: 1024 }
49-
# required pipeline via alias
49+
# final pipeline via alias
5050
- do:
5151
index:
5252
index: test_alias
@@ -59,7 +59,7 @@ teardown:
5959
id: 2
6060
- match: { _source.bytes_source_field: "1kb" }
6161
- match: { _source.bytes_target_field: 1024 }
62-
# required pipeline via upsert
62+
# final pipeline via upsert
6363
- do:
6464
update:
6565
index: test
@@ -75,7 +75,7 @@ teardown:
7575
id: 3
7676
- match: { _source.bytes_source_field: "1kb" }
7777
- match: { _source.bytes_target_field: 1024 }
78-
# required pipeline via scripted upsert
78+
# final pipeline via scripted upsert
7979
- do:
8080
update:
8181
index: test
@@ -92,7 +92,7 @@ teardown:
9292
id: 4
9393
- match: { _source.bytes_source_field: "1kb" }
9494
- match: { _source.bytes_target_field: 1024 }
95-
# required pipeline via doc_as_upsert
95+
# final pipeline via doc_as_upsert
9696
- do:
9797
update:
9898
index: test
@@ -106,7 +106,7 @@ teardown:
106106
id: 5
107107
- match: { _source.bytes_source_field: "1kb" }
108108
- match: { _source.bytes_target_field: 1024 }
109-
# required pipeline via bulk upsert
109+
# final pipeline via bulk upsert
110110
# note - bulk scripted upsert's execute the pipeline before the script, so any data referenced by the pipeline
111111
# needs to be in the upsert, not the script
112112
- do:
@@ -164,12 +164,3 @@ teardown:
164164
- match: { docs.5._source.bytes_source_field: "3kb" }
165165
- match: { docs.5._source.bytes_target_field: 3072 }
166166
- match: { docs.5._source.ran_script: true }
167-
168-
# bad request, request pipeline can not be specified
169-
- do:
170-
catch: /illegal_argument_exception.*request pipeline \[pipeline\] can not override required pipeline \[my_pipeline\]/
171-
index:
172-
index: test
173-
id: 9
174-
pipeline: "pipeline"
175-
body: {bytes_source_field: "1kb"}

server/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java

+34-57
Original file line numberDiff line numberDiff line change
@@ -79,7 +79,6 @@
7979
import java.util.HashSet;
8080
import java.util.Iterator;
8181
import java.util.List;
82-
import java.util.Locale;
8382
import java.util.Map;
8483
import java.util.Objects;
8584
import java.util.Set;
@@ -163,7 +162,7 @@ protected void doExecute(Task task, BulkRequest bulkRequest, ActionListener<Bulk
163162
IndexRequest indexRequest = getIndexWriteRequest(actionRequest);
164163
if (indexRequest != null) {
165164
// Each index request needs to be evaluated, because this method also modifies the IndexRequest
166-
boolean indexRequestHasPipeline = resolveRequiredOrDefaultPipeline(actionRequest, indexRequest, metaData);
165+
boolean indexRequestHasPipeline = resolvePipelines(actionRequest, indexRequest, metaData);
167166
hasIndexRequestsWithPipelines |= indexRequestHasPipeline;
168167
}
169168

@@ -269,16 +268,14 @@ public void onFailure(Exception e) {
269268
}
270269
}
271270

272-
static boolean resolveRequiredOrDefaultPipeline(DocWriteRequest<?> originalRequest,
273-
IndexRequest indexRequest,
274-
MetaData metaData) {
275-
271+
static boolean resolvePipelines(final DocWriteRequest<?> originalRequest, final IndexRequest indexRequest, final MetaData metaData) {
276272
if (indexRequest.isPipelineResolved() == false) {
277273
final String requestPipeline = indexRequest.getPipeline();
278274
indexRequest.setPipeline(IngestService.NOOP_PIPELINE_NAME);
279-
boolean requestCanOverridePipeline = true;
280-
String requiredPipeline = null;
281-
// start to look for default or required pipelines via settings found in the index meta data
275+
indexRequest.setFinalPipeline(IngestService.NOOP_PIPELINE_NAME);
276+
String defaultPipeline = null;
277+
String finalPipeline = null;
278+
// start to look for default or final pipelines via settings found in the index meta data
282279
IndexMetaData indexMetaData = metaData.indices().get(originalRequest.index());
283280
// check the alias for the index request (this is how normal index requests are modeled)
284281
if (indexMetaData == null && indexRequest.index() != null) {
@@ -298,64 +295,42 @@ static boolean resolveRequiredOrDefaultPipeline(DocWriteRequest<?> originalReque
298295
}
299296
if (indexMetaData != null) {
300297
final Settings indexSettings = indexMetaData.getSettings();
301-
if (IndexSettings.REQUIRED_PIPELINE.exists(indexSettings)) {
302-
// find the required pipeline if one is defined from an existing index
303-
requiredPipeline = IndexSettings.REQUIRED_PIPELINE.get(indexSettings);
304-
assert IndexSettings.DEFAULT_PIPELINE.get(indexSettings).equals(IngestService.NOOP_PIPELINE_NAME) :
305-
IndexSettings.DEFAULT_PIPELINE.get(indexSettings);
306-
indexRequest.setPipeline(requiredPipeline);
307-
requestCanOverridePipeline = false;
308-
} else {
309-
// find the default pipeline if one is defined from an existing index
310-
String defaultPipeline = IndexSettings.DEFAULT_PIPELINE.get(indexSettings);
298+
if (IndexSettings.DEFAULT_PIPELINE.exists(indexSettings)) {
299+
// find the default pipeline if one is defined from an existing index setting
300+
defaultPipeline = IndexSettings.DEFAULT_PIPELINE.get(indexSettings);
311301
indexRequest.setPipeline(defaultPipeline);
312302
}
303+
if (IndexSettings.FINAL_PIPELINE.exists(indexSettings)) {
304+
// find the final pipeline if one is defined from an existing index setting
305+
finalPipeline = IndexSettings.FINAL_PIPELINE.get(indexSettings);
306+
indexRequest.setFinalPipeline(finalPipeline);
307+
}
313308
} else if (indexRequest.index() != null) {
314-
// the index does not exist yet (and is valid request), so match index templates to look for a default pipeline
309+
// the index does not exist yet (and this is a valid request), so match index templates to look for pipelines
315310
List<IndexTemplateMetaData> templates = MetaDataIndexTemplateService.findTemplates(metaData, indexRequest.index());
316311
assert (templates != null);
317-
// order of templates are highest order first, we have to iterate through them all though
318-
String defaultPipeline = null;
319-
for (IndexTemplateMetaData template : templates) {
312+
// order of templates are highest order first
313+
for (final IndexTemplateMetaData template : templates) {
320314
final Settings settings = template.settings();
321-
if (requiredPipeline == null && IndexSettings.REQUIRED_PIPELINE.exists(settings)) {
322-
requiredPipeline = IndexSettings.REQUIRED_PIPELINE.get(settings);
323-
requestCanOverridePipeline = false;
324-
// we can not break in case a lower-order template has a default pipeline that we need to reject
325-
} else if (defaultPipeline == null && IndexSettings.DEFAULT_PIPELINE.exists(settings)) {
315+
if (defaultPipeline == null && IndexSettings.DEFAULT_PIPELINE.exists(settings)) {
326316
defaultPipeline = IndexSettings.DEFAULT_PIPELINE.get(settings);
327-
// we can not break in case a lower-order template has a required pipeline that we need to reject
317+
// we can not break in case a lower-order template has a final pipeline that we need to collect
318+
}
319+
if (finalPipeline == null && IndexSettings.FINAL_PIPELINE.exists(settings)) {
320+
finalPipeline = IndexSettings.FINAL_PIPELINE.get(settings);
321+
// we can not break in case a lower-order template has a default pipeline that we need to collect
322+
}
323+
if (defaultPipeline != null && finalPipeline != null) {
324+
// we can break if we have already collected a default and final pipeline
325+
break;
328326
}
329327
}
330-
if (requiredPipeline != null && defaultPipeline != null) {
331-
// we can not have picked up a required and a default pipeline from applying templates
332-
final String message = String.format(
333-
Locale.ROOT,
334-
"required pipeline [%s] and default pipeline [%s] can not both be set",
335-
requiredPipeline,
336-
defaultPipeline);
337-
throw new IllegalArgumentException(message);
338-
}
339-
final String pipeline;
340-
if (requiredPipeline != null) {
341-
pipeline = requiredPipeline;
342-
} else {
343-
pipeline = Objects.requireNonNullElse(defaultPipeline, IngestService.NOOP_PIPELINE_NAME);
344-
}
345-
indexRequest.setPipeline(pipeline);
328+
indexRequest.setPipeline(Objects.requireNonNullElse(defaultPipeline, IngestService.NOOP_PIPELINE_NAME));
329+
indexRequest.setFinalPipeline(Objects.requireNonNullElse(finalPipeline, IngestService.NOOP_PIPELINE_NAME));
346330
}
347331

348332
if (requestPipeline != null) {
349-
if (requestCanOverridePipeline == false) {
350-
final String message = String.format(
351-
Locale.ROOT,
352-
"request pipeline [%s] can not override required pipeline [%s]",
353-
requestPipeline,
354-
requiredPipeline);
355-
throw new IllegalArgumentException(message);
356-
} else {
357-
indexRequest.setPipeline(requestPipeline);
358-
}
333+
indexRequest.setPipeline(requestPipeline);
359334
}
360335

361336
/*
@@ -371,8 +346,10 @@ static boolean resolveRequiredOrDefaultPipeline(DocWriteRequest<?> originalReque
371346
indexRequest.isPipelineResolved(true);
372347
}
373348

374-
// Return whether this index request has a pipeline
375-
return IngestService.NOOP_PIPELINE_NAME.equals(indexRequest.getPipeline()) == false;
349+
350+
// return whether this index request has a pipeline
351+
return IngestService.NOOP_PIPELINE_NAME.equals(indexRequest.getPipeline()) == false
352+
|| IngestService.NOOP_PIPELINE_NAME.equals(indexRequest.getFinalPipeline()) == false;
376353
}
377354

378355
boolean needToCheck() {

server/src/main/java/org/elasticsearch/action/index/IndexRequest.java

+30
Original file line numberDiff line numberDiff line change
@@ -98,6 +98,7 @@ public class IndexRequest extends ReplicatedWriteRequest<IndexRequest> implement
9898
private XContentType contentType;
9999

100100
private String pipeline;
101+
private String finalPipeline;
101102

102103
private boolean isPipelineResolved;
103104

@@ -126,6 +127,9 @@ public IndexRequest(StreamInput in) throws IOException {
126127
version = in.readLong();
127128
versionType = VersionType.fromValue(in.readByte());
128129
pipeline = in.readOptionalString();
130+
if (in.getVersion().onOrAfter(Version.V_8_0_0)) {
131+
finalPipeline = in.readOptionalString();
132+
}
129133
if (in.getVersion().onOrAfter(Version.V_7_5_0)) {
130134
isPipelineResolved = in.readBoolean();
131135
}
@@ -202,6 +206,9 @@ public ActionRequestValidationException validate() {
202206
validationException = addValidationError("pipeline cannot be an empty string", validationException);
203207
}
204208

209+
if (finalPipeline != null && finalPipeline.isEmpty()) {
210+
validationException = addValidationError("final pipeline cannot be an empty string", validationException);
211+
}
205212

206213
return validationException;
207214
}
@@ -268,6 +275,26 @@ public String getPipeline() {
268275
return this.pipeline;
269276
}
270277

278+
/**
279+
* Sets the final ingest pipeline to be executed before indexing the document.
280+
*
281+
* @param finalPipeline the name of the final pipeline
282+
* @return this index request
283+
*/
284+
public IndexRequest setFinalPipeline(final String finalPipeline) {
285+
this.finalPipeline = finalPipeline;
286+
return this;
287+
}
288+
289+
/**
290+
* Returns the final ingest pipeline to be executed before indexing the document.
291+
*
292+
* @return the name of the final pipeline
293+
*/
294+
public String getFinalPipeline() {
295+
return this.finalPipeline;
296+
}
297+
271298
/**
272299
* Sets if the pipeline for this request has been resolved by the coordinating node.
273300
*
@@ -589,6 +616,9 @@ public void writeTo(StreamOutput out) throws IOException {
589616
out.writeLong(version);
590617
out.writeByte(versionType.getValue());
591618
out.writeOptionalString(pipeline);
619+
if (out.getVersion().onOrAfter(Version.V_8_0_0)) {
620+
out.writeOptionalString(finalPipeline);
621+
}
592622
if (out.getVersion().onOrAfter(Version.V_7_5_0)) {
593623
out.writeBoolean(isPipelineResolved);
594624
}

server/src/main/java/org/elasticsearch/common/settings/IndexScopedSettings.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -161,7 +161,7 @@ public final class IndexScopedSettings extends AbstractScopedSettings {
161161
EngineConfig.INDEX_CODEC_SETTING,
162162
IndexMetaData.SETTING_WAIT_FOR_ACTIVE_SHARDS,
163163
IndexSettings.DEFAULT_PIPELINE,
164-
IndexSettings.REQUIRED_PIPELINE,
164+
IndexSettings.FINAL_PIPELINE,
165165
MetaDataIndexStateService.VERIFIED_BEFORE_CLOSE_SETTING,
166166

167167
// validate that built-in similarities don't get redefined

0 commit comments

Comments
 (0)