Skip to content

Commit 39812b9

Browse files
committed
mappings: update cluster state with type mapping also for failed indexing request
When indexing of a document with a type that is not in the mappings fails, for example because "dynamic": "strict" but doc contains a new field, then the type is still created on the node that executed the indexing request. However, the change was never added to the cluster state. This commit makes sure mapping updates are always added to the cluster state even if indexing of a document fails. closes #8692 relates to #8650
1 parent bd7c6bb commit 39812b9

File tree

7 files changed

+281
-66
lines changed

7 files changed

+281
-66
lines changed
Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
/*
2+
* Licensed to Elasticsearch under one or more contributor
3+
* license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright
5+
* ownership. Elasticsearch licenses this file to you under
6+
* the Apache License, Version 2.0 (the "License"); you may
7+
* not use this file except in compliance with the License.
8+
* You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.elasticsearch.action;
21+
22+
import org.elasticsearch.ElasticsearchException;
23+
import org.elasticsearch.ElasticsearchWrapperException;
24+
import org.elasticsearch.common.Nullable;
25+
26+
27+
public class WriteFailureException extends ElasticsearchException implements ElasticsearchWrapperException {
28+
@Nullable
29+
private final String mappingTypeToUpdate;
30+
31+
public WriteFailureException(Throwable cause, String mappingTypeToUpdate) {
32+
super(null, cause);
33+
assert cause != null;
34+
this.mappingTypeToUpdate = mappingTypeToUpdate;
35+
}
36+
37+
public String getMappingTypeToUpdate() {
38+
return mappingTypeToUpdate;
39+
}
40+
}

src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java

Lines changed: 5 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -22,10 +22,10 @@
2222
import com.google.common.collect.Sets;
2323
import org.elasticsearch.ElasticsearchException;
2424
import org.elasticsearch.ElasticsearchIllegalStateException;
25-
import org.elasticsearch.ElasticsearchWrapperException;
2625
import org.elasticsearch.ExceptionsHelper;
2726
import org.elasticsearch.action.ActionRequest;
2827
import org.elasticsearch.action.RoutingMissingException;
28+
import org.elasticsearch.action.WriteFailureException;
2929
import org.elasticsearch.action.delete.DeleteRequest;
3030
import org.elasticsearch.action.delete.DeleteResponse;
3131
import org.elasticsearch.action.index.IndexRequest;
@@ -41,7 +41,6 @@
4141
import org.elasticsearch.cluster.action.shard.ShardStateAction;
4242
import org.elasticsearch.cluster.metadata.MappingMetaData;
4343
import org.elasticsearch.cluster.routing.ShardIterator;
44-
import org.elasticsearch.common.Nullable;
4544
import org.elasticsearch.common.bytes.BytesReference;
4645
import org.elasticsearch.common.collect.Tuple;
4746
import org.elasticsearch.common.inject.Inject;
@@ -160,9 +159,9 @@ protected PrimaryResponse<BulkShardResponse, BulkShardRequest> shardOperationOnP
160159
}
161160
ops[requestIndex] = result.op;
162161
}
163-
} catch (WriteFailure e) {
164-
if (e.mappingTypeToUpdate != null) {
165-
mappingTypesToUpdate.add(e.mappingTypeToUpdate);
162+
} catch (WriteFailureException e) {
163+
if (e.getMappingTypeToUpdate() != null) {
164+
mappingTypesToUpdate.add(e.getMappingTypeToUpdate());
166165
}
167166
throw e.getCause();
168167
}
@@ -395,17 +394,6 @@ <T> T response() {
395394

396395
}
397396

398-
static class WriteFailure extends ElasticsearchException implements ElasticsearchWrapperException {
399-
@Nullable
400-
final String mappingTypeToUpdate;
401-
402-
WriteFailure(Throwable cause, String mappingTypeToUpdate) {
403-
super(null, cause);
404-
assert cause != null;
405-
this.mappingTypeToUpdate = mappingTypeToUpdate;
406-
}
407-
}
408-
409397
private WriteResult shardIndexOperation(BulkShardRequest request, IndexRequest indexRequest, ClusterState clusterState,
410398
IndexShard indexShard, boolean processed) {
411399

@@ -455,7 +443,7 @@ private WriteResult shardIndexOperation(BulkShardRequest request, IndexRequest i
455443
indexRequest.versionType(indexRequest.versionType().versionTypeForReplicationAndRecovery());
456444
indexRequest.version(version);
457445
} catch (Throwable t) {
458-
throw new WriteFailure(t, mappingTypeToUpdate);
446+
throw new WriteFailureException(t, mappingTypeToUpdate);
459447
}
460448

461449
assert indexRequest.versionType().validateVersionForWrites(indexRequest.version());

src/main/java/org/elasticsearch/action/index/TransportIndexAction.java

Lines changed: 45 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import org.elasticsearch.ExceptionsHelper;
2323
import org.elasticsearch.action.ActionListener;
2424
import org.elasticsearch.action.RoutingMissingException;
25+
import org.elasticsearch.action.WriteFailureException;
2526
import org.elasticsearch.action.admin.indices.create.CreateIndexRequest;
2627
import org.elasticsearch.action.admin.indices.create.CreateIndexResponse;
2728
import org.elasticsearch.action.admin.indices.create.TransportCreateIndexAction;
@@ -39,6 +40,7 @@
3940
import org.elasticsearch.common.inject.Inject;
4041
import org.elasticsearch.common.settings.Settings;
4142
import org.elasticsearch.index.engine.Engine;
43+
import org.elasticsearch.index.mapper.DocumentMapper;
4244
import org.elasticsearch.index.mapper.SourceToParse;
4345
import org.elasticsearch.index.service.IndexService;
4446
import org.elasticsearch.index.shard.service.IndexShard;
@@ -166,7 +168,7 @@ protected ShardIterator shards(ClusterState clusterState, InternalRequest reques
166168
}
167169

168170
@Override
169-
protected PrimaryResponse<IndexResponse, IndexRequest> shardOperationOnPrimary(ClusterState clusterState, PrimaryOperationRequest shardRequest) {
171+
protected PrimaryResponse<IndexResponse, IndexRequest> shardOperationOnPrimary(ClusterState clusterState, PrimaryOperationRequest shardRequest) throws Throwable {
170172
final IndexRequest request = shardRequest.request;
171173

172174
// validate, if routing is required, that we got routing
@@ -184,43 +186,52 @@ protected PrimaryResponse<IndexResponse, IndexRequest> shardOperationOnPrimary(C
184186
.routing(request.routing()).parent(request.parent()).timestamp(request.timestamp()).ttl(request.ttl());
185187
long version;
186188
boolean created;
187-
Engine.IndexingOperation op;
188-
if (request.opType() == IndexRequest.OpType.INDEX) {
189-
Engine.Index index = indexShard.prepareIndex(sourceToParse, request.version(), request.versionType(), Engine.Operation.Origin.PRIMARY, request.canHaveDuplicates());
190-
if (index.parsedDoc().mappingsModified()) {
191-
mappingUpdatedAction.updateMappingOnMaster(shardRequest.shardId.getIndex(), index.docMapper(), indexService.indexUUID());
192-
}
193-
indexShard.index(index);
194-
version = index.version();
195-
op = index;
196-
created = index.created();
197-
} else {
198-
Engine.Create create = indexShard.prepareCreate(sourceToParse,
199-
request.version(), request.versionType(), Engine.Operation.Origin.PRIMARY, request.canHaveDuplicates(), request.autoGeneratedId());
200-
if (create.parsedDoc().mappingsModified()) {
201-
mappingUpdatedAction.updateMappingOnMaster(shardRequest.shardId.getIndex(), create.docMapper(), indexService.indexUUID());
189+
try {
190+
Engine.IndexingOperation op;
191+
if (request.opType() == IndexRequest.OpType.INDEX) {
192+
Engine.Index index = indexShard.prepareIndex(sourceToParse, request.version(), request.versionType(), Engine.Operation.Origin.PRIMARY, request.canHaveDuplicates());
193+
if (index.parsedDoc().mappingsModified()) {
194+
mappingUpdatedAction.updateMappingOnMaster(shardRequest.shardId.getIndex(), index.docMapper(), indexService.indexUUID());
195+
}
196+
indexShard.index(index);
197+
version = index.version();
198+
op = index;
199+
created = index.created();
200+
} else {
201+
Engine.Create create = indexShard.prepareCreate(sourceToParse,
202+
request.version(), request.versionType(), Engine.Operation.Origin.PRIMARY, request.canHaveDuplicates(), request.autoGeneratedId());
203+
if (create.parsedDoc().mappingsModified()) {
204+
mappingUpdatedAction.updateMappingOnMaster(shardRequest.shardId.getIndex(), create.docMapper(), indexService.indexUUID());
205+
}
206+
indexShard.create(create);
207+
version = create.version();
208+
op = create;
209+
created = true;
202210
}
203-
indexShard.create(create);
204-
version = create.version();
205-
op = create;
206-
created = true;
207-
}
208-
if (request.refresh()) {
209-
try {
210-
indexShard.refresh(new Engine.Refresh("refresh_flag_index").force(false));
211-
} catch (Throwable e) {
212-
// ignore
211+
if (request.refresh()) {
212+
try {
213+
indexShard.refresh(new Engine.Refresh("refresh_flag_index").force(false));
214+
} catch (Throwable e) {
215+
// ignore
216+
}
213217
}
214-
}
218+
// update the version on the request, so it will be used for the replicas
219+
request.version(version);
220+
request.versionType(request.versionType().versionTypeForReplicationAndRecovery());
221+
assert request.versionType().validateVersionForWrites(request.version());
215222

216-
// update the version on the request, so it will be used for the replicas
217-
request.version(version);
218-
request.versionType(request.versionType().versionTypeForReplicationAndRecovery());
223+
IndexResponse response = new IndexResponse(shardRequest.shardId.getIndex(), request.type(), request.id(), version, created);
224+
return new PrimaryResponse<>(shardRequest.request, response, op);
219225

220-
assert request.versionType().validateVersionForWrites(request.version());
221-
222-
IndexResponse response = new IndexResponse(shardRequest.shardId.getIndex(), request.type(), request.id(), version, created);
223-
return new PrimaryResponse<>(shardRequest.request, response, op);
226+
} catch (WriteFailureException e) {
227+
if (e.getMappingTypeToUpdate() != null) {
228+
DocumentMapper docMapper = indexService.mapperService().documentMapper(e.getMappingTypeToUpdate());
229+
if (docMapper != null) {
230+
mappingUpdatedAction.updateMappingOnMaster(indexService.index().name(), docMapper, indexService.indexUUID());
231+
}
232+
}
233+
throw e.getCause();
234+
}
224235
}
225236

226237
@Override

src/main/java/org/elasticsearch/action/support/replication/TransportShardReplicationOperationAction.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -105,7 +105,7 @@ protected void doExecute(Request request, ActionListener<Response> listener) {
105105

106106
protected abstract String executor();
107107

108-
protected abstract PrimaryResponse<Response, ReplicaRequest> shardOperationOnPrimary(ClusterState clusterState, PrimaryOperationRequest shardRequest);
108+
protected abstract PrimaryResponse<Response, ReplicaRequest> shardOperationOnPrimary(ClusterState clusterState, PrimaryOperationRequest shardRequest) throws Throwable;
109109

110110
protected abstract void shardOperationOnReplica(ReplicaOperationRequest shardRequest);
111111

src/main/java/org/elasticsearch/index/shard/service/InternalIndexShard.java

Lines changed: 27 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929
import org.elasticsearch.ElasticsearchException;
3030
import org.elasticsearch.ElasticsearchIllegalArgumentException;
3131
import org.elasticsearch.ElasticsearchIllegalStateException;
32+
import org.elasticsearch.action.WriteFailureException;
3233
import org.elasticsearch.cluster.routing.ShardRouting;
3334
import org.elasticsearch.cluster.routing.ShardRoutingState;
3435
import org.elasticsearch.common.Booleans;
@@ -55,6 +56,12 @@
5556
import org.elasticsearch.index.codec.CodecService;
5657
import org.elasticsearch.index.deletionpolicy.SnapshotIndexCommit;
5758
import org.elasticsearch.index.engine.*;
59+
import org.elasticsearch.index.engine.Engine;
60+
import org.elasticsearch.index.engine.EngineClosedException;
61+
import org.elasticsearch.index.engine.EngineException;
62+
import org.elasticsearch.index.engine.IgnoreOnRecoveryEngineException;
63+
import org.elasticsearch.index.engine.RefreshFailedEngineException;
64+
import org.elasticsearch.index.engine.SegmentsStats;
5865
import org.elasticsearch.index.fielddata.FieldDataStats;
5966
import org.elasticsearch.index.fielddata.IndexFieldDataService;
6067
import org.elasticsearch.index.fielddata.ShardFieldData;
@@ -389,8 +396,16 @@ private IndexShardState changeState(IndexShardState newState, String reason) {
389396
public Engine.Create prepareCreate(SourceToParse source, long version, VersionType versionType, Engine.Operation.Origin origin, boolean canHaveDuplicates, boolean autoGeneratedId) throws ElasticsearchException {
390397
long startTime = System.nanoTime();
391398
Tuple<DocumentMapper, Boolean> docMapper = mapperService.documentMapperWithAutoCreate(source.type());
392-
ParsedDocument doc = docMapper.v1().parse(source).setMappingsModified(docMapper);
393-
return new Engine.Create(docMapper.v1(), docMapper.v1().uidMapper().term(doc.uid().stringValue()), doc, version, versionType, origin, startTime, state != IndexShardState.STARTED || canHaveDuplicates, autoGeneratedId);
399+
try {
400+
ParsedDocument doc = docMapper.v1().parse(source).setMappingsModified(docMapper);
401+
return new Engine.Create(docMapper.v1(), docMapper.v1().uidMapper().term(doc.uid().stringValue()), doc, version, versionType, origin, startTime, state != IndexShardState.STARTED || canHaveDuplicates, autoGeneratedId);
402+
} catch (Throwable t) {
403+
if (docMapper.v2()) {
404+
throw new WriteFailureException(t, docMapper.v1().type());
405+
} else {
406+
throw t;
407+
}
408+
}
394409
}
395410

396411
@Override
@@ -410,8 +425,16 @@ public ParsedDocument create(Engine.Create create) throws ElasticsearchException
410425
public Engine.Index prepareIndex(SourceToParse source, long version, VersionType versionType, Engine.Operation.Origin origin, boolean canHaveDuplicates) throws ElasticsearchException {
411426
long startTime = System.nanoTime();
412427
Tuple<DocumentMapper, Boolean> docMapper = mapperService.documentMapperWithAutoCreate(source.type());
413-
ParsedDocument doc = docMapper.v1().parse(source).setMappingsModified(docMapper);
414-
return new Engine.Index(docMapper.v1(), docMapper.v1().uidMapper().term(doc.uid().stringValue()), doc, version, versionType, origin, startTime, state != IndexShardState.STARTED || canHaveDuplicates);
428+
try {
429+
ParsedDocument doc = docMapper.v1().parse(source).setMappingsModified(docMapper);
430+
return new Engine.Index(docMapper.v1(), docMapper.v1().uidMapper().term(doc.uid().stringValue()), doc, version, versionType, origin, startTime, state != IndexShardState.STARTED || canHaveDuplicates);
431+
} catch (Throwable t) {
432+
if (docMapper.v2()) {
433+
throw new WriteFailureException(t, docMapper.v1().type());
434+
} else {
435+
throw t;
436+
}
437+
}
415438
}
416439

417440
@Override
Lines changed: 76 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,76 @@
1+
/*
2+
* Licensed to Elasticsearch under one or more contributor
3+
* license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright
5+
* ownership. Elasticsearch licenses this file to you under
6+
* the Apache License, Version 2.0 (the "License"); you may
7+
* not use this file except in compliance with the License.
8+
* You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
package org.elasticsearch.index.mapper.dynamic;
20+
21+
import com.google.common.base.Predicate;
22+
import org.elasticsearch.action.admin.indices.mapping.get.GetMappingsResponse;
23+
import org.elasticsearch.client.Client;
24+
import org.elasticsearch.index.mapper.StrictDynamicMappingException;
25+
import org.elasticsearch.test.ElasticsearchIntegrationTest;
26+
import org.junit.Test;
27+
28+
import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
29+
30+
31+
public class DynamicMappingIntegrationTests extends ElasticsearchIntegrationTest {
32+
33+
// https://github.com/elasticsearch/elasticsearch/issues/8423#issuecomment-64229717
34+
@Test
35+
public void testStrictAllMapping() throws Exception {
36+
String defaultMapping = jsonBuilder().startObject().startObject("_default_")
37+
.field("dynamic", "strict")
38+
.endObject().endObject().string();
39+
client().admin().indices().prepareCreate("index").addMapping("_default_", defaultMapping).get();
40+
41+
try {
42+
client().prepareIndex("index", "type", "id").setSource("test", "test").get();
43+
fail();
44+
} catch (StrictDynamicMappingException ex) {
45+
// this should not be created dynamically so we expect this exception
46+
}
47+
awaitBusy(new Predicate<Object>() {
48+
@Override
49+
public boolean apply(java.lang.Object input) {
50+
GetMappingsResponse currentMapping = client().admin().indices().prepareGetMappings("index").get();
51+
return currentMapping.getMappings().get("index").get("type") != null;
52+
}
53+
});
54+
55+
String docMapping = jsonBuilder().startObject().startObject("type")
56+
.startObject("_all")
57+
.field("enabled", false)
58+
.endObject()
59+
.endObject().endObject().string();
60+
try {
61+
client().admin().indices()
62+
.preparePutMapping("index")
63+
.setType("type")
64+
.setSource(docMapping).get();
65+
fail();
66+
} catch (Exception e) {
67+
// the mapping was created anyway with _all enabled: true, although the index request fails so we expect the update to fail
68+
}
69+
70+
// make sure type was created
71+
for (Client client : cluster()) {
72+
GetMappingsResponse mapping = client.admin().indices().prepareGetMappings("index").setLocal(true).get();
73+
assertNotNull(mapping.getMappings().get("index").get("type"));
74+
}
75+
}
76+
}

0 commit comments

Comments
 (0)