Skip to content

Commit ff8fd67

Browse files
committed
mappings: update cluster state with type mapping also for failed indexing request
When indexing of a document with a type that is not in the mappings fails, for example because "dynamic": "strict" but doc contains a new field, then the type is still created on the node that executed the indexing request. However, the change was never added to the cluster state. This commit makes sure mapping updates are always added to the cluster state even if indexing of a document fails. closes #8692 relates to #8650
1 parent 4ee7ed9 commit ff8fd67

File tree

7 files changed

+276
-63
lines changed

7 files changed

+276
-63
lines changed
Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
/*
2+
* Licensed to Elasticsearch under one or more contributor
3+
* license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright
5+
* ownership. Elasticsearch licenses this file to you under
6+
* the Apache License, Version 2.0 (the "License"); you may
7+
* not use this file except in compliance with the License.
8+
* You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.elasticsearch.action;
21+
22+
import org.elasticsearch.ElasticsearchException;
23+
import org.elasticsearch.ElasticsearchWrapperException;
24+
import org.elasticsearch.common.Nullable;
25+
26+
27+
public class WriteFailureException extends ElasticsearchException implements ElasticsearchWrapperException {
28+
@Nullable
29+
private final String mappingTypeToUpdate;
30+
31+
public WriteFailureException(Throwable cause, String mappingTypeToUpdate) {
32+
super(null, cause);
33+
assert cause != null;
34+
this.mappingTypeToUpdate = mappingTypeToUpdate;
35+
}
36+
37+
public String getMappingTypeToUpdate() {
38+
return mappingTypeToUpdate;
39+
}
40+
}

src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java

Lines changed: 5 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -22,11 +22,11 @@
2222
import com.google.common.collect.Sets;
2323
import org.elasticsearch.ElasticsearchException;
2424
import org.elasticsearch.ElasticsearchIllegalStateException;
25-
import org.elasticsearch.ElasticsearchWrapperException;
2625
import org.elasticsearch.ExceptionsHelper;
2726
import org.elasticsearch.action.ActionRequest;
2827
import org.elasticsearch.action.ActionWriteResponse;
2928
import org.elasticsearch.action.RoutingMissingException;
29+
import org.elasticsearch.action.WriteFailureException;
3030
import org.elasticsearch.action.delete.DeleteRequest;
3131
import org.elasticsearch.action.delete.DeleteResponse;
3232
import org.elasticsearch.action.index.IndexRequest;
@@ -42,7 +42,6 @@
4242
import org.elasticsearch.cluster.action.shard.ShardStateAction;
4343
import org.elasticsearch.cluster.metadata.MappingMetaData;
4444
import org.elasticsearch.cluster.routing.ShardIterator;
45-
import org.elasticsearch.common.Nullable;
4645
import org.elasticsearch.common.bytes.BytesReference;
4746
import org.elasticsearch.common.collect.Tuple;
4847
import org.elasticsearch.common.inject.Inject;
@@ -161,9 +160,9 @@ protected Tuple<BulkShardResponse, BulkShardRequest> shardOperationOnPrimary(Clu
161160
}
162161
ops[requestIndex] = result.op;
163162
}
164-
} catch (WriteFailure e) {
165-
if (e.mappingTypeToUpdate != null) {
166-
mappingTypesToUpdate.add(e.mappingTypeToUpdate);
163+
} catch (WriteFailureException e) {
164+
if (e.getMappingTypeToUpdate() != null) {
165+
mappingTypesToUpdate.add(e.getMappingTypeToUpdate());
167166
}
168167
throw e.getCause();
169168
}
@@ -397,17 +396,6 @@ <T extends ActionWriteResponse> T response() {
397396

398397
}
399398

400-
static class WriteFailure extends ElasticsearchException implements ElasticsearchWrapperException {
401-
@Nullable
402-
final String mappingTypeToUpdate;
403-
404-
WriteFailure(Throwable cause, String mappingTypeToUpdate) {
405-
super(null, cause);
406-
assert cause != null;
407-
this.mappingTypeToUpdate = mappingTypeToUpdate;
408-
}
409-
}
410-
411399
private WriteResult shardIndexOperation(BulkShardRequest request, IndexRequest indexRequest, ClusterState clusterState,
412400
IndexShard indexShard, boolean processed) {
413401

@@ -457,7 +445,7 @@ private WriteResult shardIndexOperation(BulkShardRequest request, IndexRequest i
457445
indexRequest.versionType(indexRequest.versionType().versionTypeForReplicationAndRecovery());
458446
indexRequest.version(version);
459447
} catch (Throwable t) {
460-
throw new WriteFailure(t, mappingTypeToUpdate);
448+
throw new WriteFailureException(t, mappingTypeToUpdate);
461449
}
462450

463451
assert indexRequest.versionType().validateVersionForWrites(indexRequest.version());

src/main/java/org/elasticsearch/action/index/TransportIndexAction.java

Lines changed: 42 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import org.elasticsearch.ExceptionsHelper;
2323
import org.elasticsearch.action.ActionListener;
2424
import org.elasticsearch.action.RoutingMissingException;
25+
import org.elasticsearch.action.WriteFailureException;
2526
import org.elasticsearch.action.admin.indices.create.CreateIndexRequest;
2627
import org.elasticsearch.action.admin.indices.create.CreateIndexResponse;
2728
import org.elasticsearch.action.admin.indices.create.TransportCreateIndexAction;
@@ -40,6 +41,7 @@
4041
import org.elasticsearch.common.inject.Inject;
4142
import org.elasticsearch.common.settings.Settings;
4243
import org.elasticsearch.index.engine.Engine;
44+
import org.elasticsearch.index.mapper.DocumentMapper;
4345
import org.elasticsearch.index.mapper.SourceToParse;
4446
import org.elasticsearch.index.IndexService;
4547
import org.elasticsearch.index.shard.IndexShard;
@@ -167,7 +169,7 @@ protected ShardIterator shards(ClusterState clusterState, InternalRequest reques
167169
}
168170

169171
@Override
170-
protected Tuple<IndexResponse, IndexRequest> shardOperationOnPrimary(ClusterState clusterState, PrimaryOperationRequest shardRequest) {
172+
protected Tuple<IndexResponse, IndexRequest> shardOperationOnPrimary(ClusterState clusterState, PrimaryOperationRequest shardRequest) throws Throwable {
171173
final IndexRequest request = shardRequest.request;
172174

173175
// validate, if routing is required, that we got routing
@@ -185,39 +187,49 @@ protected Tuple<IndexResponse, IndexRequest> shardOperationOnPrimary(ClusterStat
185187
.routing(request.routing()).parent(request.parent()).timestamp(request.timestamp()).ttl(request.ttl());
186188
long version;
187189
boolean created;
188-
if (request.opType() == IndexRequest.OpType.INDEX) {
189-
Engine.Index index = indexShard.prepareIndex(sourceToParse, request.version(), request.versionType(), Engine.Operation.Origin.PRIMARY, request.canHaveDuplicates());
190-
if (index.parsedDoc().mappingsModified()) {
191-
mappingUpdatedAction.updateMappingOnMaster(shardRequest.shardId.getIndex(), index.docMapper(), indexService.indexUUID());
192-
}
193-
indexShard.index(index);
194-
version = index.version();
195-
created = index.created();
196-
} else {
197-
Engine.Create create = indexShard.prepareCreate(sourceToParse,
198-
request.version(), request.versionType(), Engine.Operation.Origin.PRIMARY, request.canHaveDuplicates(), request.autoGeneratedId());
199-
if (create.parsedDoc().mappingsModified()) {
200-
mappingUpdatedAction.updateMappingOnMaster(shardRequest.shardId.getIndex(), create.docMapper(), indexService.indexUUID());
190+
191+
try {
192+
if (request.opType() == IndexRequest.OpType.INDEX) {
193+
Engine.Index index = indexShard.prepareIndex(sourceToParse, request.version(), request.versionType(), Engine.Operation.Origin.PRIMARY, request.canHaveDuplicates());
194+
if (index.parsedDoc().mappingsModified()) {
195+
mappingUpdatedAction.updateMappingOnMaster(shardRequest.shardId.getIndex(), index.docMapper(), indexService.indexUUID());
196+
}
197+
indexShard.index(index);
198+
version = index.version();
199+
created = index.created();
200+
} else {
201+
Engine.Create create = indexShard.prepareCreate(sourceToParse,
202+
request.version(), request.versionType(), Engine.Operation.Origin.PRIMARY, request.canHaveDuplicates(), request.autoGeneratedId());
203+
if (create.parsedDoc().mappingsModified()) {
204+
mappingUpdatedAction.updateMappingOnMaster(shardRequest.shardId.getIndex(), create.docMapper(), indexService.indexUUID());
205+
}
206+
indexShard.create(create);
207+
version = create.version();
208+
created = true;
201209
}
202-
indexShard.create(create);
203-
version = create.version();
204-
created = true;
205-
}
206-
if (request.refresh()) {
207-
try {
208-
indexShard.refresh("refresh_flag_index");
209-
} catch (Throwable e) {
210-
// ignore
210+
if (request.refresh()) {
211+
try {
212+
indexShard.refresh("refresh_flag_index");
213+
} catch (Throwable e) {
214+
// ignore
215+
}
211216
}
212-
}
213-
214-
// update the version on the request, so it will be used for the replicas
215-
request.version(version);
216-
request.versionType(request.versionType().versionTypeForReplicationAndRecovery());
217217

218-
assert request.versionType().validateVersionForWrites(request.version());
218+
// update the version on the request, so it will be used for the replicas
219+
request.version(version);
220+
request.versionType(request.versionType().versionTypeForReplicationAndRecovery());
219221

220-
return new Tuple<>(new IndexResponse(shardRequest.shardId.getIndex(), request.type(), request.id(), version, created), shardRequest.request);
222+
assert request.versionType().validateVersionForWrites(request.version());
223+
return new Tuple<>(new IndexResponse(shardRequest.shardId.getIndex(), request.type(), request.id(), version, created), shardRequest.request);
224+
} catch (WriteFailureException e) {
225+
if (e.getMappingTypeToUpdate() != null){
226+
DocumentMapper docMapper = indexService.mapperService().documentMapper(e.getMappingTypeToUpdate());
227+
if (docMapper != null) {
228+
mappingUpdatedAction.updateMappingOnMaster(indexService.index().name(), docMapper, indexService.indexUUID());
229+
}
230+
}
231+
throw e.getCause();
232+
}
221233
}
222234

223235
@Override

src/main/java/org/elasticsearch/action/support/replication/TransportShardReplicationOperationAction.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -117,7 +117,7 @@ protected void doExecute(Request request, ActionListener<Response> listener) {
117117
* @return A tuple containing not null values, as first value the result of the primary operation and as second value
118118
* the request to be executed on the replica shards.
119119
*/
120-
protected abstract Tuple<Response, ReplicaRequest> shardOperationOnPrimary(ClusterState clusterState, PrimaryOperationRequest shardRequest);
120+
protected abstract Tuple<Response, ReplicaRequest> shardOperationOnPrimary(ClusterState clusterState, PrimaryOperationRequest shardRequest) throws Throwable;
121121

122122
protected abstract void shardOperationOnReplica(ReplicaOperationRequest shardRequest);
123123

src/main/java/org/elasticsearch/index/shard/IndexShard.java

Lines changed: 27 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@
3434
import org.elasticsearch.ElasticsearchIllegalStateException;
3535
import org.elasticsearch.action.admin.indices.flush.FlushRequest;
3636
import org.elasticsearch.action.admin.indices.optimize.OptimizeRequest;
37-
import org.elasticsearch.cluster.metadata.IndexMetaData;
37+
import org.elasticsearch.action.WriteFailureException;
3838
import org.elasticsearch.cluster.routing.ShardRouting;
3939
import org.elasticsearch.cluster.routing.ShardRoutingState;
4040
import org.elasticsearch.common.Booleans;
@@ -64,6 +64,12 @@
6464
import org.elasticsearch.index.deletionpolicy.SnapshotDeletionPolicy;
6565
import org.elasticsearch.index.deletionpolicy.SnapshotIndexCommit;
6666
import org.elasticsearch.index.engine.*;
67+
import org.elasticsearch.index.engine.Engine;
68+
import org.elasticsearch.index.engine.EngineClosedException;
69+
import org.elasticsearch.index.engine.EngineException;
70+
import org.elasticsearch.index.engine.IgnoreOnRecoveryEngineException;
71+
import org.elasticsearch.index.engine.RefreshFailedEngineException;
72+
import org.elasticsearch.index.engine.SegmentsStats;
6773
import org.elasticsearch.index.fielddata.FieldDataStats;
6874
import org.elasticsearch.index.fielddata.IndexFieldDataService;
6975
import org.elasticsearch.index.fielddata.ShardFieldData;
@@ -409,8 +415,16 @@ private IndexShardState changeState(IndexShardState newState, String reason) {
409415
public Engine.Create prepareCreate(SourceToParse source, long version, VersionType versionType, Engine.Operation.Origin origin, boolean canHaveDuplicates, boolean autoGeneratedId) throws ElasticsearchException {
410416
long startTime = System.nanoTime();
411417
Tuple<DocumentMapper, Boolean> docMapper = mapperService.documentMapperWithAutoCreate(source.type());
412-
ParsedDocument doc = docMapper.v1().parse(source).setMappingsModified(docMapper);
413-
return new Engine.Create(docMapper.v1(), docMapper.v1().uidMapper().term(doc.uid().stringValue()), doc, version, versionType, origin, startTime, state != IndexShardState.STARTED || canHaveDuplicates, autoGeneratedId);
418+
try {
419+
ParsedDocument doc = docMapper.v1().parse(source).setMappingsModified(docMapper);
420+
return new Engine.Create(docMapper.v1(), docMapper.v1().uidMapper().term(doc.uid().stringValue()), doc, version, versionType, origin, startTime, state != IndexShardState.STARTED || canHaveDuplicates, autoGeneratedId);
421+
} catch (Throwable t) {
422+
if (docMapper.v2()) {
423+
throw new WriteFailureException(t, docMapper.v1().type());
424+
} else {
425+
throw t;
426+
}
427+
}
414428
}
415429

416430
public ParsedDocument create(Engine.Create create) throws ElasticsearchException {
@@ -434,8 +448,16 @@ public ParsedDocument create(Engine.Create create) throws ElasticsearchException
434448
public Engine.Index prepareIndex(SourceToParse source, long version, VersionType versionType, Engine.Operation.Origin origin, boolean canHaveDuplicates) throws ElasticsearchException {
435449
long startTime = System.nanoTime();
436450
Tuple<DocumentMapper, Boolean> docMapper = mapperService.documentMapperWithAutoCreate(source.type());
437-
ParsedDocument doc = docMapper.v1().parse(source).setMappingsModified(docMapper);
438-
return new Engine.Index(docMapper.v1(), docMapper.v1().uidMapper().term(doc.uid().stringValue()), doc, version, versionType, origin, startTime, state != IndexShardState.STARTED || canHaveDuplicates);
451+
try {
452+
ParsedDocument doc = docMapper.v1().parse(source).setMappingsModified(docMapper);
453+
return new Engine.Index(docMapper.v1(), docMapper.v1().uidMapper().term(doc.uid().stringValue()), doc, version, versionType, origin, startTime, state != IndexShardState.STARTED || canHaveDuplicates);
454+
} catch (Throwable t) {
455+
if (docMapper.v2()) {
456+
throw new WriteFailureException(t, docMapper.v1().type());
457+
} else {
458+
throw t;
459+
}
460+
}
439461
}
440462

441463
public ParsedDocument index(Engine.Index index) throws ElasticsearchException {
Lines changed: 76 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,76 @@
1+
/*
2+
* Licensed to Elasticsearch under one or more contributor
3+
* license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright
5+
* ownership. Elasticsearch licenses this file to you under
6+
* the Apache License, Version 2.0 (the "License"); you may
7+
* not use this file except in compliance with the License.
8+
* You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
package org.elasticsearch.index.mapper.dynamic;
20+
21+
import com.google.common.base.Predicate;
22+
import org.elasticsearch.action.admin.indices.mapping.get.GetMappingsResponse;
23+
import org.elasticsearch.client.Client;
24+
import org.elasticsearch.index.mapper.StrictDynamicMappingException;
25+
import org.elasticsearch.test.ElasticsearchIntegrationTest;
26+
import org.junit.Test;
27+
28+
import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
29+
30+
31+
public class DynamicMappingIntegrationTests extends ElasticsearchIntegrationTest {
32+
33+
// https://github.com/elasticsearch/elasticsearch/issues/8423#issuecomment-64229717
34+
@Test
35+
public void testStrictAllMapping() throws Exception {
36+
String defaultMapping = jsonBuilder().startObject().startObject("_default_")
37+
.field("dynamic", "strict")
38+
.endObject().endObject().string();
39+
client().admin().indices().prepareCreate("index").addMapping("_default_", defaultMapping).get();
40+
41+
try {
42+
client().prepareIndex("index", "type", "id").setSource("test", "test").get();
43+
fail();
44+
} catch (StrictDynamicMappingException ex) {
45+
// this should not be created dynamically so we expect this exception
46+
}
47+
awaitBusy(new Predicate<Object>() {
48+
@Override
49+
public boolean apply(java.lang.Object input) {
50+
GetMappingsResponse currentMapping = client().admin().indices().prepareGetMappings("index").get();
51+
return currentMapping.getMappings().get("index").get("type") != null;
52+
}
53+
});
54+
55+
String docMapping = jsonBuilder().startObject().startObject("type")
56+
.startObject("_all")
57+
.field("enabled", false)
58+
.endObject()
59+
.endObject().endObject().string();
60+
try {
61+
client().admin().indices()
62+
.preparePutMapping("index")
63+
.setType("type")
64+
.setSource(docMapping).get();
65+
fail();
66+
} catch (Exception e) {
67+
// the mapping was created anyway with _all enabled: true, although the index request fails so we expect the update to fail
68+
}
69+
70+
// make sure type was created
71+
for (Client client : cluster()) {
72+
GetMappingsResponse mapping = client.admin().indices().prepareGetMappings("index").setLocal(true).get();
73+
assertNotNull(mapping.getMappings().get("index").get("type"));
74+
}
75+
}
76+
}

0 commit comments

Comments
 (0)