Skip to content

Commit 2dc056b

Browse files
authored
Read the default pipeline for bulk upsert through an alias (elastic#41963) (elastic#42802)
This commit allows bulk upserts to correctly read the default pipeline for the concrete index that belongs to an alias. Bulk upserts are modeled differently from normal index requests such that the index request is a request inside of the update request. The update request (outer) contains the index or alias name is not part of the (inner) index request. This commit adds a secondary check against the update request (outer) if the index request (inner) does not find an alias.
1 parent a4e518b commit 2dc056b

File tree

3 files changed

+52
-7
lines changed

3 files changed

+52
-7
lines changed

modules/ingest-common/src/test/resources/rest-api-spec/test/ingest/200_default_pipeline.yml

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -119,6 +119,12 @@ teardown:
119119
{"doc":{"bytes_source_field":"2kb"}, "doc_as_upsert":true}
120120
{"update":{"_id":"8","_index":"test"}}
121121
{"script": "ctx._source.ran_script = true","upsert":{"bytes_source_field":"3kb"}, "scripted_upsert" : true}
122+
{"update":{"_id":"6_alias","_index":"test_alias"}}
123+
{"script":"ctx._source.ran_script = true","upsert":{"bytes_source_field":"1kb"}}
124+
{"update":{"_id":"7_alias","_index":"test_alias"}}
125+
{"doc":{"bytes_source_field":"2kb"}, "doc_as_upsert":true}
126+
{"update":{"_id":"8_alias","_index":"test_alias"}}
127+
{"script": "ctx._source.ran_script = true","upsert":{"bytes_source_field":"3kb"}, "scripted_upsert" : true}
122128
123129
- do:
124130
mget:
@@ -127,6 +133,9 @@ teardown:
127133
- { _index: "test", _id: "6" }
128134
- { _index: "test", _id: "7" }
129135
- { _index: "test", _id: "8" }
136+
- { _index: "test", _id: "6_alias" }
137+
- { _index: "test", _id: "7_alias" }
138+
- { _index: "test", _id: "8_alias" }
130139
- match: { docs.0._index: "test" }
131140
- match: { docs.0._id: "6" }
132141
- match: { docs.0._source.bytes_source_field: "1kb" }
@@ -141,6 +150,20 @@ teardown:
141150
- match: { docs.2._source.bytes_source_field: "3kb" }
142151
- match: { docs.2._source.bytes_target_field: 3072 }
143152
- match: { docs.2._source.ran_script: true }
153+
- match: { docs.3._index: "test" }
154+
- match: { docs.3._id: "6_alias" }
155+
- match: { docs.3._source.bytes_source_field: "1kb" }
156+
- match: { docs.3._source.bytes_target_field: 1024 }
157+
- is_false: docs.3._source.ran_script
158+
- match: { docs.4._index: "test" }
159+
- match: { docs.4._id: "7_alias" }
160+
- match: { docs.4._source.bytes_source_field: "2kb" }
161+
- match: { docs.4._source.bytes_target_field: 2048 }
162+
- match: { docs.5._index: "test" }
163+
- match: { docs.5._id: "8_alias" }
164+
- match: { docs.5._source.bytes_source_field: "3kb" }
165+
- match: { docs.5._source.bytes_target_field: 3072 }
166+
- match: { docs.5._source.ran_script: true }
144167

145168
# explicit no default pipeline
146169
- do:

server/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -165,14 +165,22 @@ protected void doExecute(Task task, BulkRequest bulkRequest, ActionListener<Bulk
165165
if (pipeline == null) {
166166
// start to look for default pipeline via settings found in the index meta data
167167
IndexMetaData indexMetaData = indicesMetaData.get(actionRequest.index());
168+
// check the alias for the index request (this is how normal index requests are modeled)
168169
if (indexMetaData == null && indexRequest.index() != null) {
169-
// if the write request if through an alias use the write index's meta data
170170
AliasOrIndex indexOrAlias = metaData.getAliasAndIndexLookup().get(indexRequest.index());
171171
if (indexOrAlias != null && indexOrAlias.isAlias()) {
172172
AliasOrIndex.Alias alias = (AliasOrIndex.Alias) indexOrAlias;
173173
indexMetaData = alias.getWriteIndex();
174174
}
175175
}
176+
// check the alias for the action request (this is how upserts are modeled)
177+
if (indexMetaData == null && actionRequest.index() != null) {
178+
AliasOrIndex indexOrAlias = metaData.getAliasAndIndexLookup().get(actionRequest.index());
179+
if (indexOrAlias != null && indexOrAlias.isAlias()) {
180+
AliasOrIndex.Alias alias = (AliasOrIndex.Alias) indexOrAlias;
181+
indexMetaData = alias.getWriteIndex();
182+
}
183+
}
176184
if (indexMetaData != null) {
177185
// Find the default pipeline if one is defined from and existing index.
178186
String defaultPipeline = IndexSettings.DEFAULT_PIPELINE.get(indexMetaData.getSettings());

server/src/test/java/org/elasticsearch/action/bulk/TransportBulkActionIngestTests.java

Lines changed: 20 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@
4040
import org.elasticsearch.cluster.node.DiscoveryNode;
4141
import org.elasticsearch.cluster.node.DiscoveryNodes;
4242
import org.elasticsearch.cluster.service.ClusterService;
43+
import org.elasticsearch.common.Nullable;
4344
import org.elasticsearch.common.collect.ImmutableOpenMap;
4445
import org.elasticsearch.common.settings.ClusterSettings;
4546
import org.elasticsearch.common.settings.Settings;
@@ -412,16 +413,29 @@ public void testUseDefaultPipelineWithAlias() throws Exception {
412413
}
413414

414415
public void testUseDefaultPipelineWithBulkUpsert() throws Exception {
416+
String indexRequestName = randomFrom(new String[]{null, WITH_DEFAULT_PIPELINE, WITH_DEFAULT_PIPELINE_ALIAS});
417+
validatePipelineWithBulkUpsert(indexRequestName, WITH_DEFAULT_PIPELINE);
418+
}
419+
420+
public void testUseDefaultPipelineWithBulkUpsertWithAlias() throws Exception {
421+
String indexRequestName = randomFrom(new String[]{null, WITH_DEFAULT_PIPELINE, WITH_DEFAULT_PIPELINE_ALIAS});
422+
validatePipelineWithBulkUpsert(indexRequestName, WITH_DEFAULT_PIPELINE_ALIAS);
423+
}
424+
425+
private void validatePipelineWithBulkUpsert(@Nullable String indexRequestIndexName, String updateRequestIndexName) throws Exception {
415426
Exception exception = new Exception("fake exception");
416427
BulkRequest bulkRequest = new BulkRequest();
417-
IndexRequest indexRequest1 = new IndexRequest(WITH_DEFAULT_PIPELINE, "type", "id1").source(Collections.emptyMap());
418-
IndexRequest indexRequest2 = new IndexRequest(WITH_DEFAULT_PIPELINE, "type", "id2").source(Collections.emptyMap());
419-
IndexRequest indexRequest3 = new IndexRequest(WITH_DEFAULT_PIPELINE, "type", "id3").source(Collections.emptyMap());
420-
UpdateRequest upsertRequest = new UpdateRequest(WITH_DEFAULT_PIPELINE, "type", "id1").upsert(indexRequest1).script(mockScript("1"));
421-
UpdateRequest docAsUpsertRequest = new UpdateRequest(WITH_DEFAULT_PIPELINE, "type", "id2").doc(indexRequest2).docAsUpsert(true);
428+
IndexRequest indexRequest1 = new IndexRequest(indexRequestIndexName, "type", "id1").source(Collections.emptyMap());
429+
IndexRequest indexRequest2 = new IndexRequest(indexRequestIndexName, "type", "id2").source(Collections.emptyMap());
430+
IndexRequest indexRequest3 = new IndexRequest(indexRequestIndexName, "type", "id3").source(Collections.emptyMap());
431+
UpdateRequest upsertRequest = new UpdateRequest(updateRequestIndexName, "type", "id1")
432+
.upsert(indexRequest1).script(mockScript("1"));
433+
UpdateRequest docAsUpsertRequest = new UpdateRequest(updateRequestIndexName, "type", "id2")
434+
.doc(indexRequest2).docAsUpsert(true);
422435
// this test only covers the mechanics that scripted bulk upserts will execute a default pipeline. However, in practice scripted
423436
// bulk upserts with a default pipeline are a bit surprising since the script executes AFTER the pipeline.
424-
UpdateRequest scriptedUpsert = new UpdateRequest(WITH_DEFAULT_PIPELINE, "type", "id2").upsert(indexRequest3).script(mockScript("1"))
437+
UpdateRequest scriptedUpsert = new UpdateRequest(updateRequestIndexName, "type", "id2")
438+
.upsert(indexRequest3).script(mockScript("1"))
425439
.scriptedUpsert(true);
426440
bulkRequest.add(upsertRequest).add(docAsUpsertRequest).add(scriptedUpsert);
427441

0 commit comments

Comments
 (0)