Skip to content

Commit 2478a96

Browse files
committed
mappings: update cluster state with type mapping also for failed indexing request
When indexing of a document with a type that is not in the mappings fails, for example because "dynamic": "strict" but doc contains a new field, then the type is still created on the node that executed the indexing request. However, the change was never added to the cluster state. This commit makes sure mapping updates are always added to the cluster state even if indexing of a document fails. closes #8692 relates to #8650
1 parent 2bb9db3 commit 2478a96

File tree

7 files changed

+279
-66
lines changed

7 files changed

+279
-66
lines changed
Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
/*
2+
* Licensed to Elasticsearch under one or more contributor
3+
* license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright
5+
* ownership. Elasticsearch licenses this file to you under
6+
* the Apache License, Version 2.0 (the "License"); you may
7+
* not use this file except in compliance with the License.
8+
* You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.elasticsearch.action;
21+
22+
import org.elasticsearch.ElasticsearchException;
23+
import org.elasticsearch.ElasticsearchWrapperException;
24+
import org.elasticsearch.common.Nullable;
25+
26+
27+
public class WriteFailureException extends ElasticsearchException implements ElasticsearchWrapperException {
28+
@Nullable
29+
private final String mappingTypeToUpdate;
30+
31+
public WriteFailureException(Throwable cause, String mappingTypeToUpdate) {
32+
super(null, cause);
33+
assert cause != null;
34+
this.mappingTypeToUpdate = mappingTypeToUpdate;
35+
}
36+
37+
public String getMappingTypeToUpdate() {
38+
return mappingTypeToUpdate;
39+
}
40+
}

src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java

Lines changed: 5 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -22,10 +22,10 @@
2222
import com.google.common.collect.Sets;
2323
import org.elasticsearch.ElasticsearchException;
2424
import org.elasticsearch.ElasticsearchIllegalStateException;
25-
import org.elasticsearch.ElasticsearchWrapperException;
2625
import org.elasticsearch.ExceptionsHelper;
2726
import org.elasticsearch.action.ActionRequest;
2827
import org.elasticsearch.action.RoutingMissingException;
28+
import org.elasticsearch.action.WriteFailureException;
2929
import org.elasticsearch.action.delete.DeleteRequest;
3030
import org.elasticsearch.action.delete.DeleteResponse;
3131
import org.elasticsearch.action.index.IndexRequest;
@@ -41,7 +41,6 @@
4141
import org.elasticsearch.cluster.action.shard.ShardStateAction;
4242
import org.elasticsearch.cluster.metadata.MappingMetaData;
4343
import org.elasticsearch.cluster.routing.ShardIterator;
44-
import org.elasticsearch.common.Nullable;
4544
import org.elasticsearch.common.bytes.BytesReference;
4645
import org.elasticsearch.common.collect.Tuple;
4746
import org.elasticsearch.common.inject.Inject;
@@ -160,9 +159,9 @@ protected PrimaryResponse<BulkShardResponse, BulkShardRequest> shardOperationOnP
160159
}
161160
ops[requestIndex] = result.op;
162161
}
163-
} catch (WriteFailure e) {
164-
if (e.mappingTypeToUpdate != null) {
165-
mappingTypesToUpdate.add(e.mappingTypeToUpdate);
162+
} catch (WriteFailureException e) {
163+
if (e.getMappingTypeToUpdate() != null) {
164+
mappingTypesToUpdate.add(e.getMappingTypeToUpdate());
166165
}
167166
throw e.getCause();
168167
}
@@ -395,17 +394,6 @@ <T> T response() {
395394

396395
}
397396

398-
static class WriteFailure extends ElasticsearchException implements ElasticsearchWrapperException {
399-
@Nullable
400-
final String mappingTypeToUpdate;
401-
402-
WriteFailure(Throwable cause, String mappingTypeToUpdate) {
403-
super(null, cause);
404-
assert cause != null;
405-
this.mappingTypeToUpdate = mappingTypeToUpdate;
406-
}
407-
}
408-
409397
private WriteResult shardIndexOperation(BulkShardRequest request, IndexRequest indexRequest, ClusterState clusterState,
410398
IndexShard indexShard, boolean processed) {
411399

@@ -455,7 +443,7 @@ private WriteResult shardIndexOperation(BulkShardRequest request, IndexRequest i
455443
indexRequest.versionType(indexRequest.versionType().versionTypeForReplicationAndRecovery());
456444
indexRequest.version(version);
457445
} catch (Throwable t) {
458-
throw new WriteFailure(t, mappingTypeToUpdate);
446+
throw new WriteFailureException(t, mappingTypeToUpdate);
459447
}
460448

461449
assert indexRequest.versionType().validateVersionForWrites(indexRequest.version());

src/main/java/org/elasticsearch/action/index/TransportIndexAction.java

Lines changed: 45 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import org.elasticsearch.ExceptionsHelper;
2323
import org.elasticsearch.action.ActionListener;
2424
import org.elasticsearch.action.RoutingMissingException;
25+
import org.elasticsearch.action.WriteFailureException;
2526
import org.elasticsearch.action.admin.indices.create.CreateIndexRequest;
2627
import org.elasticsearch.action.admin.indices.create.CreateIndexResponse;
2728
import org.elasticsearch.action.admin.indices.create.TransportCreateIndexAction;
@@ -39,6 +40,7 @@
3940
import org.elasticsearch.common.inject.Inject;
4041
import org.elasticsearch.common.settings.Settings;
4142
import org.elasticsearch.index.engine.Engine;
43+
import org.elasticsearch.index.mapper.DocumentMapper;
4244
import org.elasticsearch.index.mapper.SourceToParse;
4345
import org.elasticsearch.index.IndexService;
4446
import org.elasticsearch.index.shard.IndexShard;
@@ -166,7 +168,7 @@ protected ShardIterator shards(ClusterState clusterState, InternalRequest reques
166168
}
167169

168170
@Override
169-
protected PrimaryResponse<IndexResponse, IndexRequest> shardOperationOnPrimary(ClusterState clusterState, PrimaryOperationRequest shardRequest) {
171+
protected PrimaryResponse<IndexResponse, IndexRequest> shardOperationOnPrimary(ClusterState clusterState, PrimaryOperationRequest shardRequest) throws Throwable {
170172
final IndexRequest request = shardRequest.request;
171173

172174
// validate, if routing is required, that we got routing
@@ -184,43 +186,52 @@ protected PrimaryResponse<IndexResponse, IndexRequest> shardOperationOnPrimary(C
184186
.routing(request.routing()).parent(request.parent()).timestamp(request.timestamp()).ttl(request.ttl());
185187
long version;
186188
boolean created;
187-
Engine.IndexingOperation op;
188-
if (request.opType() == IndexRequest.OpType.INDEX) {
189-
Engine.Index index = indexShard.prepareIndex(sourceToParse, request.version(), request.versionType(), Engine.Operation.Origin.PRIMARY, request.canHaveDuplicates());
190-
if (index.parsedDoc().mappingsModified()) {
191-
mappingUpdatedAction.updateMappingOnMaster(shardRequest.shardId.getIndex(), index.docMapper(), indexService.indexUUID());
192-
}
193-
indexShard.index(index);
194-
version = index.version();
195-
op = index;
196-
created = index.created();
197-
} else {
198-
Engine.Create create = indexShard.prepareCreate(sourceToParse,
199-
request.version(), request.versionType(), Engine.Operation.Origin.PRIMARY, request.canHaveDuplicates(), request.autoGeneratedId());
200-
if (create.parsedDoc().mappingsModified()) {
201-
mappingUpdatedAction.updateMappingOnMaster(shardRequest.shardId.getIndex(), create.docMapper(), indexService.indexUUID());
189+
try {
190+
Engine.IndexingOperation op;
191+
if (request.opType() == IndexRequest.OpType.INDEX) {
192+
Engine.Index index = indexShard.prepareIndex(sourceToParse, request.version(), request.versionType(), Engine.Operation.Origin.PRIMARY, request.canHaveDuplicates());
193+
if (index.parsedDoc().mappingsModified()) {
194+
mappingUpdatedAction.updateMappingOnMaster(shardRequest.shardId.getIndex(), index.docMapper(), indexService.indexUUID());
195+
}
196+
indexShard.index(index);
197+
version = index.version();
198+
op = index;
199+
created = index.created();
200+
} else {
201+
Engine.Create create = indexShard.prepareCreate(sourceToParse,
202+
request.version(), request.versionType(), Engine.Operation.Origin.PRIMARY, request.canHaveDuplicates(), request.autoGeneratedId());
203+
if (create.parsedDoc().mappingsModified()) {
204+
mappingUpdatedAction.updateMappingOnMaster(shardRequest.shardId.getIndex(), create.docMapper(), indexService.indexUUID());
205+
}
206+
indexShard.create(create);
207+
version = create.version();
208+
op = create;
209+
created = true;
202210
}
203-
indexShard.create(create);
204-
version = create.version();
205-
op = create;
206-
created = true;
207-
}
208-
if (request.refresh()) {
209-
try {
210-
indexShard.refresh("refresh_flag_index");
211-
} catch (Throwable e) {
212-
// ignore
211+
if (request.refresh()) {
212+
try {
213+
indexShard.refresh("refresh_flag_index");
214+
} catch (Throwable e) {
215+
// ignore
216+
}
213217
}
214-
}
218+
// update the version on the request, so it will be used for the replicas
219+
request.version(version);
220+
request.versionType(request.versionType().versionTypeForReplicationAndRecovery());
221+
assert request.versionType().validateVersionForWrites(request.version());
215222

216-
// update the version on the request, so it will be used for the replicas
217-
request.version(version);
218-
request.versionType(request.versionType().versionTypeForReplicationAndRecovery());
223+
IndexResponse response = new IndexResponse(shardRequest.shardId.getIndex(), request.type(), request.id(), version, created);
224+
return new PrimaryResponse<>(shardRequest.request, response, op);
219225

220-
assert request.versionType().validateVersionForWrites(request.version());
221-
222-
IndexResponse response = new IndexResponse(shardRequest.shardId.getIndex(), request.type(), request.id(), version, created);
223-
return new PrimaryResponse<>(shardRequest.request, response, op);
226+
} catch (WriteFailureException e) {
227+
if (e.getMappingTypeToUpdate() != null) {
228+
DocumentMapper docMapper = indexService.mapperService().documentMapper(e.getMappingTypeToUpdate());
229+
if (docMapper != null) {
230+
mappingUpdatedAction.updateMappingOnMaster(indexService.index().name(), docMapper, indexService.indexUUID());
231+
}
232+
}
233+
throw e.getCause();
234+
}
224235
}
225236

226237
@Override

src/main/java/org/elasticsearch/action/support/replication/TransportShardReplicationOperationAction.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -108,7 +108,7 @@ protected void doExecute(Request request, ActionListener<Response> listener) {
108108

109109
protected abstract String executor();
110110

111-
protected abstract PrimaryResponse<Response, ReplicaRequest> shardOperationOnPrimary(ClusterState clusterState, PrimaryOperationRequest shardRequest);
111+
protected abstract PrimaryResponse<Response, ReplicaRequest> shardOperationOnPrimary(ClusterState clusterState, PrimaryOperationRequest shardRequest) throws Throwable;
112112

113113
protected abstract void shardOperationOnReplica(ReplicaOperationRequest shardRequest);
114114

src/main/java/org/elasticsearch/index/shard/IndexShard.java

Lines changed: 27 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@
3232
import org.elasticsearch.ElasticsearchIllegalStateException;
3333
import org.elasticsearch.action.admin.indices.flush.FlushRequest;
3434
import org.elasticsearch.action.admin.indices.optimize.OptimizeRequest;
35+
import org.elasticsearch.action.WriteFailureException;
3536
import org.elasticsearch.cluster.routing.ShardRouting;
3637
import org.elasticsearch.cluster.routing.ShardRoutingState;
3738
import org.elasticsearch.common.Booleans;
@@ -64,6 +65,12 @@
6465
import org.elasticsearch.index.deletionpolicy.SnapshotDeletionPolicy;
6566
import org.elasticsearch.index.deletionpolicy.SnapshotIndexCommit;
6667
import org.elasticsearch.index.engine.*;
68+
import org.elasticsearch.index.engine.Engine;
69+
import org.elasticsearch.index.engine.EngineClosedException;
70+
import org.elasticsearch.index.engine.EngineException;
71+
import org.elasticsearch.index.engine.IgnoreOnRecoveryEngineException;
72+
import org.elasticsearch.index.engine.RefreshFailedEngineException;
73+
import org.elasticsearch.index.engine.SegmentsStats;
6774
import org.elasticsearch.index.fielddata.FieldDataStats;
6875
import org.elasticsearch.index.fielddata.IndexFieldDataService;
6976
import org.elasticsearch.index.fielddata.ShardFieldData;
@@ -408,8 +415,16 @@ private IndexShardState changeState(IndexShardState newState, String reason) {
408415
public Engine.Create prepareCreate(SourceToParse source, long version, VersionType versionType, Engine.Operation.Origin origin, boolean canHaveDuplicates, boolean autoGeneratedId) throws ElasticsearchException {
409416
long startTime = System.nanoTime();
410417
Tuple<DocumentMapper, Boolean> docMapper = mapperService.documentMapperWithAutoCreate(source.type());
411-
ParsedDocument doc = docMapper.v1().parse(source).setMappingsModified(docMapper);
412-
return new Engine.Create(docMapper.v1(), docMapper.v1().uidMapper().term(doc.uid().stringValue()), doc, version, versionType, origin, startTime, state != IndexShardState.STARTED || canHaveDuplicates, autoGeneratedId);
418+
try {
419+
ParsedDocument doc = docMapper.v1().parse(source).setMappingsModified(docMapper);
420+
return new Engine.Create(docMapper.v1(), docMapper.v1().uidMapper().term(doc.uid().stringValue()), doc, version, versionType, origin, startTime, state != IndexShardState.STARTED || canHaveDuplicates, autoGeneratedId);
421+
} catch (Throwable t) {
422+
if (docMapper.v2()) {
423+
throw new WriteFailureException(t, docMapper.v1().type());
424+
} else {
425+
throw t;
426+
}
427+
}
413428
}
414429

415430
public ParsedDocument create(Engine.Create create) throws ElasticsearchException {
@@ -427,8 +442,16 @@ public ParsedDocument create(Engine.Create create) throws ElasticsearchException
427442
public Engine.Index prepareIndex(SourceToParse source, long version, VersionType versionType, Engine.Operation.Origin origin, boolean canHaveDuplicates) throws ElasticsearchException {
428443
long startTime = System.nanoTime();
429444
Tuple<DocumentMapper, Boolean> docMapper = mapperService.documentMapperWithAutoCreate(source.type());
430-
ParsedDocument doc = docMapper.v1().parse(source).setMappingsModified(docMapper);
431-
return new Engine.Index(docMapper.v1(), docMapper.v1().uidMapper().term(doc.uid().stringValue()), doc, version, versionType, origin, startTime, state != IndexShardState.STARTED || canHaveDuplicates);
445+
try {
446+
ParsedDocument doc = docMapper.v1().parse(source).setMappingsModified(docMapper);
447+
return new Engine.Index(docMapper.v1(), docMapper.v1().uidMapper().term(doc.uid().stringValue()), doc, version, versionType, origin, startTime, state != IndexShardState.STARTED || canHaveDuplicates);
448+
} catch (Throwable t) {
449+
if (docMapper.v2()) {
450+
throw new WriteFailureException(t, docMapper.v1().type());
451+
} else {
452+
throw t;
453+
}
454+
}
432455
}
433456

434457
public ParsedDocument index(Engine.Index index) throws ElasticsearchException {
Lines changed: 76 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,76 @@
1+
/*
2+
* Licensed to Elasticsearch under one or more contributor
3+
* license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright
5+
* ownership. Elasticsearch licenses this file to you under
6+
* the Apache License, Version 2.0 (the "License"); you may
7+
* not use this file except in compliance with the License.
8+
* You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
package org.elasticsearch.index.mapper.dynamic;
20+
21+
import com.google.common.base.Predicate;
22+
import org.elasticsearch.action.admin.indices.mapping.get.GetMappingsResponse;
23+
import org.elasticsearch.client.Client;
24+
import org.elasticsearch.index.mapper.StrictDynamicMappingException;
25+
import org.elasticsearch.test.ElasticsearchIntegrationTest;
26+
import org.junit.Test;
27+
28+
import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
29+
30+
31+
public class DynamicMappingIntegrationTests extends ElasticsearchIntegrationTest {
32+
33+
// https://github.com/elasticsearch/elasticsearch/issues/8423#issuecomment-64229717
34+
@Test
35+
public void testStrictAllMapping() throws Exception {
36+
String defaultMapping = jsonBuilder().startObject().startObject("_default_")
37+
.field("dynamic", "strict")
38+
.endObject().endObject().string();
39+
client().admin().indices().prepareCreate("index").addMapping("_default_", defaultMapping).get();
40+
41+
try {
42+
client().prepareIndex("index", "type", "id").setSource("test", "test").get();
43+
fail();
44+
} catch (StrictDynamicMappingException ex) {
45+
// this should not be created dynamically so we expect this exception
46+
}
47+
awaitBusy(new Predicate<Object>() {
48+
@Override
49+
public boolean apply(java.lang.Object input) {
50+
GetMappingsResponse currentMapping = client().admin().indices().prepareGetMappings("index").get();
51+
return currentMapping.getMappings().get("index").get("type") != null;
52+
}
53+
});
54+
55+
String docMapping = jsonBuilder().startObject().startObject("type")
56+
.startObject("_all")
57+
.field("enabled", false)
58+
.endObject()
59+
.endObject().endObject().string();
60+
try {
61+
client().admin().indices()
62+
.preparePutMapping("index")
63+
.setType("type")
64+
.setSource(docMapping).get();
65+
fail();
66+
} catch (Exception e) {
67+
// the mapping was created anyway with _all enabled: true, although the index request fails so we expect the update to fail
68+
}
69+
70+
// make sure type was created
71+
for (Client client : cluster()) {
72+
GetMappingsResponse mapping = client.admin().indices().prepareGetMappings("index").setLocal(true).get();
73+
assertNotNull(mapping.getMappings().get("index").get("type"));
74+
}
75+
}
76+
}

0 commit comments

Comments
 (0)