Skip to content

Commit 3429f79

Browse files
authored
Do not mutate request on scripted upsert (#49578)
Fixes a bug where a scripted upsert that causes a dynamic mapping update is retried (because mapping update is still in-flight), and the request is mutated multiple times. Closes #48670
1 parent 4013e81 commit 3429f79

File tree

2 files changed

+41
-5
lines changed

2 files changed

+41
-5
lines changed

server/src/main/java/org/elasticsearch/action/update/UpdateHelper.java

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -99,8 +99,7 @@ protected Result prepare(ShardId shardId, UpdateRequest request, final GetResult
9999
* Execute a scripted upsert, where there is an existing upsert document and a script to be executed. The script is executed and a new
100100
* Tuple of operation and updated {@code _source} is returned.
101101
*/
102-
Tuple<UpdateOpType, Map<String, Object>> executeScriptedUpsert(IndexRequest upsert, Script script, LongSupplier nowInMillis) {
103-
Map<String, Object> upsertDoc = upsert.sourceAsMap();
102+
Tuple<UpdateOpType, Map<String, Object>> executeScriptedUpsert(Map<String, Object> upsertDoc, Script script, LongSupplier nowInMillis) {
104103
Map<String, Object> ctx = new HashMap<>(3);
105104
// Tell the script that this is a create and not an update
106105
ctx.put(ContextFields.OP, UpdateOpType.CREATE.toString());
@@ -133,11 +132,11 @@ Result prepareUpsert(ShardId shardId, UpdateRequest request, final GetResult get
133132
if (request.scriptedUpsert() && request.script() != null) {
134133
// Run the script to perform the create logic
135134
IndexRequest upsert = request.upsertRequest();
136-
Tuple<UpdateOpType, Map<String, Object>> upsertResult = executeScriptedUpsert(upsert, request.script, nowInMillis);
135+
Tuple<UpdateOpType, Map<String, Object>> upsertResult = executeScriptedUpsert(upsert.sourceAsMap(), request.script,
136+
nowInMillis);
137137
switch (upsertResult.v1()) {
138138
case CREATE:
139-
// Update the index request with the new "_source"
140-
indexRequest.source(upsertResult.v2());
139+
indexRequest = Requests.indexRequest(request.index()).source(upsertResult.v2());
141140
break;
142141
case NONE:
143142
UpdateResponse update = new UpdateResponse(shardId, getResult.getId(),

server/src/test/java/org/elasticsearch/action/bulk/BulkWithUpdatesIT.java

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -199,6 +199,43 @@ public void testBulkUpdateSimple() throws Exception {
199199
assertThat(((Number) getResponse.getSource().get("field")).longValue(), equalTo(4L));
200200
}
201201

202+
public void testBulkUpdateWithScriptedUpsertAndDynamicMappingUpdate() throws Exception {
203+
assertAcked(prepareCreate("test").addAlias(new Alias("alias")));
204+
ensureGreen();
205+
206+
final Script script = new Script(ScriptType.INLINE, CustomScriptPlugin.NAME, "ctx._source.field += 1", Collections.emptyMap());
207+
208+
BulkResponse bulkResponse = client().prepareBulk()
209+
.add(client().prepareUpdate().setIndex(indexOrAlias()).setId("1")
210+
.setScript(script).setScriptedUpsert(true).setUpsert("field", 1))
211+
.add(client().prepareUpdate().setIndex(indexOrAlias()).setId("2")
212+
.setScript(script).setScriptedUpsert(true).setUpsert("field", 1))
213+
.get();
214+
215+
logger.info(bulkResponse.buildFailureMessage());
216+
217+
assertThat(bulkResponse.hasFailures(), equalTo(false));
218+
assertThat(bulkResponse.getItems().length, equalTo(2));
219+
for (BulkItemResponse bulkItemResponse : bulkResponse) {
220+
assertThat(bulkItemResponse.getIndex(), equalTo("test"));
221+
}
222+
assertThat(bulkResponse.getItems()[0].getResponse().getId(), equalTo("1"));
223+
assertThat(bulkResponse.getItems()[0].getResponse().getVersion(), equalTo(1L));
224+
assertThat(bulkResponse.getItems()[1].getResponse().getId(), equalTo("2"));
225+
assertThat(bulkResponse.getItems()[1].getResponse().getVersion(), equalTo(1L));
226+
227+
GetResponse getResponse = client().prepareGet().setIndex("test").setId("1").execute()
228+
.actionGet();
229+
assertThat(getResponse.isExists(), equalTo(true));
230+
assertThat(getResponse.getVersion(), equalTo(1L));
231+
assertThat(((Number) getResponse.getSource().get("field")).longValue(), equalTo(2L));
232+
233+
getResponse = client().prepareGet().setIndex("test").setId("2").execute().actionGet();
234+
assertThat(getResponse.isExists(), equalTo(true));
235+
assertThat(getResponse.getVersion(), equalTo(1L));
236+
assertThat(((Number) getResponse.getSource().get("field")).longValue(), equalTo(2L));
237+
}
238+
202239
public void testBulkWithCAS() throws Exception {
203240
createIndex("test", Settings.builder().put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1).build());
204241
ensureGreen();

0 commit comments

Comments
 (0)