Skip to content

Commit 66ec358

Browse files
authored
Execute ingest node pipeline before creating the index (#39607)
Prior to this commit (and after 6.5.0), if an ingest node changes the _index in a pipeline, the original target index would be created. For daily indexes this could create an extra, empty index per day. This commit changes the TransportBulkAction to execute the ingest node pipeline before attempting to create the index. This ensures that the only index created is the original or one set by the ingest node pipeline. This was the execution order prior to 6.5.0 (#32786). The execution order was changed in 6.5 to better support default pipelines. Specifically the execution order was changed to be able to read the settings from the index meta data. This commit also includes a change in logic such that if the target index does not exist when ingest node pipeline runs, it will now pull the default pipeline (if one exists) from the settings of the best matched of the index template. Relates #32786 Relates #32758 Closes #36545
1 parent cd9f8cc commit 66ec358

File tree

5 files changed

+252
-121
lines changed

5 files changed

+252
-121
lines changed

docs/reference/ingest/ingest-node.asciidoc

Lines changed: 0 additions & 66 deletions
Original file line numberDiff line numberDiff line change
@@ -271,28 +271,6 @@ POST test/_doc/1?pipeline=drop_guests_network
271271
// CONSOLE
272272
// TEST[continued]
273273

274-
////
275-
Hidden example assertion:
276-
[source,js]
277-
--------------------------------------------------
278-
GET test/_doc/1
279-
--------------------------------------------------
280-
// CONSOLE
281-
// TEST[continued]
282-
// TEST[catch:missing]
283-
284-
[source,js]
285-
--------------------------------------------------
286-
{
287-
"_index": "test",
288-
"_type": "_doc",
289-
"_id": "1",
290-
"found": false
291-
}
292-
--------------------------------------------------
293-
// TESTRESPONSE
294-
////
295-
296274
Thanks to the `?.` operator the following document will not throw an error.
297275
If the pipeline used a `.` the following document would throw a NullPointerException
298276
since the `network` object is not part of the source document.
@@ -392,28 +370,6 @@ POST test/_doc/3?pipeline=drop_guests_network
392370
// CONSOLE
393371
// TEST[continued]
394372

395-
////
396-
Hidden example assertion:
397-
[source,js]
398-
--------------------------------------------------
399-
GET test/_doc/3
400-
--------------------------------------------------
401-
// CONSOLE
402-
// TEST[continued]
403-
// TEST[catch:missing]
404-
405-
[source,js]
406-
--------------------------------------------------
407-
{
408-
"_index": "test",
409-
"_type": "_doc",
410-
"_id": "3",
411-
"found": false
412-
}
413-
--------------------------------------------------
414-
// TESTRESPONSE
415-
////
416-
417373
The `?.` operators works well for use in the `if` conditional
418374
because the {painless}/painless-operators-reference.html#null-safe-operator[null safe operator]
419375
returns null if the object is null and `==` is null safe (as well as many other
@@ -511,28 +467,6 @@ POST test/_doc/1?pipeline=not_prod_dropper
511467
The document is <<drop-processor,dropped>> since `prod` (case insensitive)
512468
is not found in the tags.
513469

514-
////
515-
Hidden example assertion:
516-
[source,js]
517-
--------------------------------------------------
518-
GET test/_doc/1
519-
--------------------------------------------------
520-
// CONSOLE
521-
// TEST[continued]
522-
// TEST[catch:missing]
523-
524-
[source,js]
525-
--------------------------------------------------
526-
{
527-
"_index": "test",
528-
"_type": "_doc",
529-
"_id": "1",
530-
"found": false
531-
}
532-
--------------------------------------------------
533-
// TESTRESPONSE
534-
////
535-
536470
The following document is indexed (i.e. not dropped) since
537471
`prod` (case insensitive) is found in the tags.
538472

modules/ingest-common/src/test/resources/rest-api-spec/test/ingest/220_drop_processor.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -91,4 +91,4 @@ teardown:
9191
get:
9292
index: test
9393
id: 3
94-
- match: { found: false }
94+
Lines changed: 119 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,119 @@
1+
---
2+
teardown:
3+
- do:
4+
ingest.delete_pipeline:
5+
id: "retarget"
6+
ignore: 404
7+
8+
- do:
9+
indices.delete:
10+
index: foo
11+
12+
---
13+
"Test Change Target Index with Explicit Pipeline":
14+
15+
- do:
16+
ingest.put_pipeline:
17+
id: "retarget"
18+
body: >
19+
{
20+
"processors": [
21+
{
22+
"set" : {
23+
"field" : "_index",
24+
"value" : "foo"
25+
}
26+
}
27+
]
28+
}
29+
- match: { acknowledged: true }
30+
31+
# no indices
32+
- do:
33+
cat.indices: {}
34+
35+
- match:
36+
$body: |
37+
/^$/
38+
39+
- do:
40+
index:
41+
index: test
42+
id: 1
43+
pipeline: "retarget"
44+
body: {
45+
a: true
46+
}
47+
48+
- do:
49+
get:
50+
index: foo
51+
id: 1
52+
- match: { _source.a: true }
53+
54+
# only the foo index
55+
- do:
56+
cat.indices:
57+
h: i
58+
59+
- match:
60+
$body: |
61+
/^foo\n$/
62+
63+
---
64+
"Test Change Target Index with Default Pipeline":
65+
66+
- do:
67+
indices.put_template:
68+
name: index_template
69+
body:
70+
index_patterns: test
71+
settings:
72+
default_pipeline: "retarget"
73+
74+
- do:
75+
ingest.put_pipeline:
76+
id: "retarget"
77+
body: >
78+
{
79+
"processors": [
80+
{
81+
"set" : {
82+
"field" : "_index",
83+
"value" : "foo"
84+
}
85+
}
86+
]
87+
}
88+
- match: { acknowledged: true }
89+
90+
# no indices
91+
- do:
92+
cat.indices: {}
93+
94+
- match:
95+
$body: |
96+
/^$/
97+
98+
- do:
99+
index:
100+
index: test
101+
id: 1
102+
body: {
103+
a: true
104+
}
105+
106+
- do:
107+
get:
108+
index: foo
109+
id: 1
110+
- match: { _source.a: true }
111+
112+
# only the foo index
113+
- do:
114+
cat.indices:
115+
h: i
116+
117+
- match:
118+
$body: |
119+
/^foo\n$/

server/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java

Lines changed: 73 additions & 53 deletions
Original file line numberDiff line numberDiff line change
@@ -47,11 +47,14 @@
4747
import org.elasticsearch.cluster.metadata.AliasOrIndex;
4848
import org.elasticsearch.cluster.metadata.IndexMetaData;
4949
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
50+
import org.elasticsearch.cluster.metadata.IndexTemplateMetaData;
5051
import org.elasticsearch.cluster.metadata.MappingMetaData;
5152
import org.elasticsearch.cluster.metadata.MetaData;
53+
import org.elasticsearch.cluster.metadata.MetaDataIndexTemplateService;
5254
import org.elasticsearch.cluster.service.ClusterService;
5355
import org.elasticsearch.common.collect.ImmutableOpenMap;
5456
import org.elasticsearch.common.inject.Inject;
57+
import org.elasticsearch.common.settings.Settings;
5558
import org.elasticsearch.common.unit.TimeValue;
5659
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
5760
import org.elasticsearch.common.util.concurrent.AtomicArray;
@@ -151,6 +154,72 @@ protected void doExecute(Task task, BulkRequest bulkRequest, ActionListener<Bulk
151154
final long startTime = relativeTime();
152155
final AtomicArray<BulkItemResponse> responses = new AtomicArray<>(bulkRequest.requests.size());
153156

157+
boolean hasIndexRequestsWithPipelines = false;
158+
final MetaData metaData = clusterService.state().getMetaData();
159+
ImmutableOpenMap<String, IndexMetaData> indicesMetaData = metaData.indices();
160+
for (DocWriteRequest<?> actionRequest : bulkRequest.requests) {
161+
IndexRequest indexRequest = getIndexWriteRequest(actionRequest);
162+
if (indexRequest != null) {
163+
// get pipeline from request
164+
String pipeline = indexRequest.getPipeline();
165+
if (pipeline == null) {
166+
// start to look for default pipeline via settings found in the index meta data
167+
IndexMetaData indexMetaData = indicesMetaData.get(actionRequest.index());
168+
if (indexMetaData == null && indexRequest.index() != null) {
169+
// if the write request if through an alias use the write index's meta data
170+
AliasOrIndex indexOrAlias = metaData.getAliasAndIndexLookup().get(indexRequest.index());
171+
if (indexOrAlias != null && indexOrAlias.isAlias()) {
172+
AliasOrIndex.Alias alias = (AliasOrIndex.Alias) indexOrAlias;
173+
indexMetaData = alias.getWriteIndex();
174+
}
175+
}
176+
if (indexMetaData != null) {
177+
// Find the the default pipeline if one is defined from and existing index.
178+
String defaultPipeline = IndexSettings.DEFAULT_PIPELINE.get(indexMetaData.getSettings());
179+
indexRequest.setPipeline(defaultPipeline);
180+
if (IngestService.NOOP_PIPELINE_NAME.equals(defaultPipeline) == false) {
181+
hasIndexRequestsWithPipelines = true;
182+
}
183+
} else if (indexRequest.index() != null) {
184+
// No index exists yet (and is valid request), so matching index templates to look for a default pipeline
185+
List<IndexTemplateMetaData> templates = MetaDataIndexTemplateService.findTemplates(metaData, indexRequest.index());
186+
assert (templates != null);
187+
String defaultPipeline = IngestService.NOOP_PIPELINE_NAME;
188+
// order of templates are highest order first, break if we find a default_pipeline
189+
for (IndexTemplateMetaData template : templates) {
190+
final Settings settings = template.settings();
191+
if (IndexSettings.DEFAULT_PIPELINE.exists(settings)) {
192+
defaultPipeline = IndexSettings.DEFAULT_PIPELINE.get(settings);
193+
break;
194+
}
195+
}
196+
indexRequest.setPipeline(defaultPipeline);
197+
if (IngestService.NOOP_PIPELINE_NAME.equals(defaultPipeline) == false) {
198+
hasIndexRequestsWithPipelines = true;
199+
}
200+
}
201+
} else if (IngestService.NOOP_PIPELINE_NAME.equals(pipeline) == false) {
202+
hasIndexRequestsWithPipelines = true;
203+
}
204+
}
205+
}
206+
207+
if (hasIndexRequestsWithPipelines) {
208+
// this method (doExecute) will be called again, but with the bulk requests updated from the ingest node processing but
209+
// also with IngestService.NOOP_PIPELINE_NAME on each request. This ensures that this on the second time through this method,
210+
// this path is never taken.
211+
try {
212+
if (clusterService.localNode().isIngestNode()) {
213+
processBulkIndexIngestRequest(task, bulkRequest, listener);
214+
} else {
215+
ingestForwarder.forwardIngestRequest(BulkAction.INSTANCE, bulkRequest, listener);
216+
}
217+
} catch (Exception e) {
218+
listener.onFailure(e);
219+
}
220+
return;
221+
}
222+
154223
if (needToCheck()) {
155224
// Attempt to create all the indices that we're going to need during the bulk before we start.
156225
// Step 1: collect all the indices in the request
@@ -181,15 +250,15 @@ protected void doExecute(Task task, BulkRequest bulkRequest, ActionListener<Bulk
181250
}
182251
// Step 3: create all the indices that are missing, if there are any missing. start the bulk after all the creates come back.
183252
if (autoCreateIndices.isEmpty()) {
184-
executeIngestAndBulk(task, bulkRequest, startTime, listener, responses, indicesThatCannotBeCreated);
253+
executeBulk(task, bulkRequest, startTime, listener, responses, indicesThatCannotBeCreated);
185254
} else {
186255
final AtomicInteger counter = new AtomicInteger(autoCreateIndices.size());
187256
for (String index : autoCreateIndices) {
188257
createIndex(index, bulkRequest.timeout(), new ActionListener<CreateIndexResponse>() {
189258
@Override
190259
public void onResponse(CreateIndexResponse result) {
191260
if (counter.decrementAndGet() == 0) {
192-
executeIngestAndBulk(task, bulkRequest, startTime, listener, responses, indicesThatCannotBeCreated);
261+
executeBulk(task, bulkRequest, startTime, listener, responses, indicesThatCannotBeCreated);
193262
}
194263
}
195264

@@ -205,7 +274,7 @@ public void onFailure(Exception e) {
205274
}
206275
}
207276
if (counter.decrementAndGet() == 0) {
208-
executeIngestAndBulk(task, bulkRequest, startTime, ActionListener.wrap(listener::onResponse, inner -> {
277+
executeBulk(task, bulkRequest, startTime, ActionListener.wrap(listener::onResponse, inner -> {
209278
inner.addSuppressed(e);
210279
listener.onFailure(inner);
211280
}), responses, indicesThatCannotBeCreated);
@@ -215,56 +284,7 @@ public void onFailure(Exception e) {
215284
}
216285
}
217286
} else {
218-
executeIngestAndBulk(task, bulkRequest, startTime, listener, responses, emptyMap());
219-
}
220-
}
221-
222-
private void executeIngestAndBulk(Task task, final BulkRequest bulkRequest, final long startTimeNanos,
223-
final ActionListener<BulkResponse> listener, final AtomicArray<BulkItemResponse> responses,
224-
Map<String, IndexNotFoundException> indicesThatCannotBeCreated) {
225-
boolean hasIndexRequestsWithPipelines = false;
226-
final MetaData metaData = clusterService.state().getMetaData();
227-
ImmutableOpenMap<String, IndexMetaData> indicesMetaData = metaData.indices();
228-
for (DocWriteRequest<?> actionRequest : bulkRequest.requests) {
229-
IndexRequest indexRequest = getIndexWriteRequest(actionRequest);
230-
if(indexRequest != null){
231-
String pipeline = indexRequest.getPipeline();
232-
if (pipeline == null) {
233-
IndexMetaData indexMetaData = indicesMetaData.get(actionRequest.index());
234-
if (indexMetaData == null && indexRequest.index() != null) {
235-
//check the alias
236-
AliasOrIndex indexOrAlias = metaData.getAliasAndIndexLookup().get(indexRequest.index());
237-
if (indexOrAlias != null && indexOrAlias.isAlias()) {
238-
AliasOrIndex.Alias alias = (AliasOrIndex.Alias) indexOrAlias;
239-
indexMetaData = alias.getWriteIndex();
240-
}
241-
}
242-
if (indexMetaData == null) {
243-
indexRequest.setPipeline(IngestService.NOOP_PIPELINE_NAME);
244-
} else {
245-
String defaultPipeline = IndexSettings.DEFAULT_PIPELINE.get(indexMetaData.getSettings());
246-
indexRequest.setPipeline(defaultPipeline);
247-
if (IngestService.NOOP_PIPELINE_NAME.equals(defaultPipeline) == false) {
248-
hasIndexRequestsWithPipelines = true;
249-
}
250-
}
251-
} else if (IngestService.NOOP_PIPELINE_NAME.equals(pipeline) == false) {
252-
hasIndexRequestsWithPipelines = true;
253-
}
254-
}
255-
}
256-
if (hasIndexRequestsWithPipelines) {
257-
try {
258-
if (clusterService.localNode().isIngestNode()) {
259-
processBulkIndexIngestRequest(task, bulkRequest, listener);
260-
} else {
261-
ingestForwarder.forwardIngestRequest(BulkAction.INSTANCE, bulkRequest, listener);
262-
}
263-
} catch (Exception e) {
264-
listener.onFailure(e);
265-
}
266-
} else {
267-
executeBulk(task, bulkRequest, startTimeNanos, listener, responses, indicesThatCannotBeCreated);
287+
executeBulk(task, bulkRequest, startTime, listener, responses, emptyMap());
268288
}
269289
}
270290

0 commit comments

Comments
 (0)