Skip to content

Commit bd5c6c4

Browse files
authored
Add preflight check to dynamic mapping updates (#48867)
Today if the primary discovers that an indexing request needs a mapping update then it will send it to the master for validation and processing. If, however, the put-mapping request is invalid then the master still processes it as a (no-op) cluster state update. When there are a large number of indexing operations that result in invalid mapping updates this can overwhelm the master. However, the primary already has a reasonably up-to-date mapping against which it can check the (approximate) validity of the put-mapping request before sending it to the master. For instance it is not possible to remove fields in a mapping update, so if the primary detects that a mapping update will exceed the fields limit then it can reject it itself and avoid bothering the master. This commit adds a pre-flight check to the mapping update path so that the primary can discard obviously-invalid put-mapping requests itself. Fixes #35564 Backport of #48817
1 parent 24f7d4e commit bd5c6c4

File tree

5 files changed

+97
-12
lines changed

5 files changed

+97
-12
lines changed

server/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java

+14
Original file line numberDiff line numberDiff line change
@@ -48,16 +48,19 @@
4848
import org.elasticsearch.cluster.service.ClusterService;
4949
import org.elasticsearch.common.bytes.BytesReference;
5050
import org.elasticsearch.common.collect.Tuple;
51+
import org.elasticsearch.common.compress.CompressedXContent;
5152
import org.elasticsearch.common.inject.Inject;
5253
import org.elasticsearch.common.io.stream.StreamInput;
5354
import org.elasticsearch.common.settings.Settings;
5455
import org.elasticsearch.common.unit.TimeValue;
56+
import org.elasticsearch.common.xcontent.ToXContent;
5557
import org.elasticsearch.common.xcontent.XContentHelper;
5658
import org.elasticsearch.common.xcontent.XContentType;
5759
import org.elasticsearch.index.engine.Engine;
5860
import org.elasticsearch.index.engine.VersionConflictEngineException;
5961
import org.elasticsearch.index.get.GetResult;
6062
import org.elasticsearch.index.mapper.MapperException;
63+
import org.elasticsearch.index.mapper.MapperService;
6164
import org.elasticsearch.index.mapper.SourceToParse;
6265
import org.elasticsearch.index.seqno.SequenceNumbers;
6366
import org.elasticsearch.index.shard.IndexShard;
@@ -260,6 +263,17 @@ static boolean executeBulkItemRequest(BulkPrimaryExecutionContext context, Updat
260263
request.ifSeqNo(), request.ifPrimaryTerm(), request.getAutoGeneratedTimestamp(), request.isRetry());
261264
}
262265
if (result.getResultType() == Engine.Result.Type.MAPPING_UPDATE_REQUIRED) {
266+
267+
try {
268+
primary.mapperService().merge(context.getRequestToExecute().type(),
269+
new CompressedXContent(result.getRequiredMappingUpdate(), XContentType.JSON, ToXContent.EMPTY_PARAMS),
270+
MapperService.MergeReason.MAPPING_UPDATE_PREFLIGHT);
271+
} catch (Exception e) {
272+
logger.info(() -> new ParameterizedMessage("{} mapping update rejected by primary", primary.shardId()), e);
273+
onComplete(exceptionToResult(e, primary, isDelete, version), context, updateResult);
274+
return true;
275+
}
276+
263277
mappingUpdater.updateMappings(result.getRequiredMappingUpdate(), primary.shardId(),
264278
context.getRequestToExecute().type(),
265279
new ActionListener<Void>() {

server/src/main/java/org/elasticsearch/index/mapper/MapperService.java

+11-2
Original file line numberDiff line numberDiff line change
@@ -85,6 +85,10 @@ public class MapperService extends AbstractIndexComponent implements Closeable {
8585
* The reason why a mapping is being merged.
8686
*/
8787
public enum MergeReason {
88+
/**
89+
* Pre-flight check before sending a mapping update to the master
90+
*/
91+
MAPPING_UPDATE_PREFLIGHT,
8892
/**
8993
* Create or update a mapping.
9094
*/
@@ -341,6 +345,7 @@ public DocumentMapper merge(String type, CompressedXContent mappingSource, Merge
341345

342346
private synchronized Map<String, DocumentMapper> internalMerge(IndexMetaData indexMetaData,
343347
MergeReason reason, boolean onlyUpdateIfNeeded) {
348+
assert reason != MergeReason.MAPPING_UPDATE_PREFLIGHT;
344349
Map<String, CompressedXContent> map = new LinkedHashMap<>();
345350
for (ObjectCursor<MappingMetaData> cursor : indexMetaData.getMappings().values()) {
346351
MappingMetaData mappingMetaData = cursor.value;
@@ -494,7 +499,7 @@ private synchronized Map<String, DocumentMapper> internalMerge(@Nullable Documen
494499

495500
ContextMapping.validateContextPaths(indexSettings.getIndexVersionCreated(), fieldMappers, fieldTypes::get);
496501

497-
if (reason == MergeReason.MAPPING_UPDATE) {
502+
if (reason == MergeReason.MAPPING_UPDATE || reason == MergeReason.MAPPING_UPDATE_PREFLIGHT) {
498503
// this check will only be performed on the master node when there is
499504
// a call to the update mapping API. For all other cases like
500505
// the master node restoring mappings from disk or data nodes
@@ -509,7 +514,7 @@ private synchronized Map<String, DocumentMapper> internalMerge(@Nullable Documen
509514
results.put(newMapper.type(), newMapper);
510515
}
511516

512-
if (reason == MergeReason.MAPPING_UPDATE) {
517+
if (reason == MergeReason.MAPPING_UPDATE || reason == MergeReason.MAPPING_UPDATE_PREFLIGHT) {
513518
// this check will only be performed on the master node when there is
514519
// a call to the update mapping API. For all other cases like
515520
// the master node restoring mappings from disk or data nodes
@@ -532,6 +537,10 @@ private synchronized Map<String, DocumentMapper> internalMerge(@Nullable Documen
532537
// make structures immutable
533538
results = Collections.unmodifiableMap(results);
534539

540+
if (reason == MergeReason.MAPPING_UPDATE_PREFLIGHT) {
541+
return results;
542+
}
543+
535544
// only need to immutably rewrap these if the previous reference was changed.
536545
// if not then they are already implicitly immutable.
537546
if (fullPathObjectMappers != this.fullPathObjectMappers) {

server/src/test/java/org/elasticsearch/action/bulk/TransportShardBulkActionTests.java

+6-2
Original file line numberDiff line numberDiff line change
@@ -44,8 +44,10 @@
4444
import org.elasticsearch.index.VersionType;
4545
import org.elasticsearch.index.engine.Engine;
4646
import org.elasticsearch.index.engine.VersionConflictEngineException;
47+
import org.elasticsearch.index.mapper.MapperService;
4748
import org.elasticsearch.index.mapper.Mapping;
4849
import org.elasticsearch.index.mapper.MetadataFieldMapper;
50+
import org.elasticsearch.index.mapper.RootObjectMapper;
4951
import org.elasticsearch.index.shard.IndexShard;
5052
import org.elasticsearch.index.shard.IndexShardTestCase;
5153
import org.elasticsearch.index.shard.ShardId;
@@ -235,14 +237,15 @@ public void testExecuteBulkIndexRequestWithMappingUpdates() throws Exception {
235237
new BulkShardRequest(shardId, RefreshPolicy.NONE, items);
236238

237239
Engine.IndexResult mappingUpdate =
238-
new Engine.IndexResult(new Mapping(null, null, new MetadataFieldMapper[0], Collections.emptyMap()));
240+
new Engine.IndexResult(new Mapping(null, mock(RootObjectMapper.class), new MetadataFieldMapper[0], Collections.emptyMap()));
239241
Translog.Location resultLocation = new Translog.Location(42, 42, 42);
240242
Engine.IndexResult success = new FakeIndexResult(1, 1, 13, true, resultLocation);
241243

242244
IndexShard shard = mock(IndexShard.class);
243245
when(shard.shardId()).thenReturn(shardId);
244246
when(shard.applyIndexOperationOnPrimary(anyLong(), any(), any(), anyLong(), anyLong(), anyLong(), anyBoolean()))
245247
.thenReturn(mappingUpdate);
248+
when(shard.mapperService()).thenReturn(mock(MapperService.class));
246249

247250
randomlySetIgnoredPrimaryResponse(items[0]);
248251

@@ -770,7 +773,7 @@ public void testRetries() throws Exception {
770773
"I'm conflicted <(;_;)>");
771774
Engine.IndexResult conflictedResult = new Engine.IndexResult(err, 0);
772775
Engine.IndexResult mappingUpdate =
773-
new Engine.IndexResult(new Mapping(null, null, new MetadataFieldMapper[0], Collections.emptyMap()));
776+
new Engine.IndexResult(new Mapping(null, mock(RootObjectMapper.class), new MetadataFieldMapper[0], Collections.emptyMap()));
774777
Translog.Location resultLocation = new Translog.Location(42, 42, 42);
775778
Engine.IndexResult success = new FakeIndexResult(1, 1, 13, true, resultLocation);
776779

@@ -787,6 +790,7 @@ public void testRetries() throws Exception {
787790
});
788791
when(shard.indexSettings()).thenReturn(indexSettings);
789792
when(shard.shardId()).thenReturn(shardId);
793+
when(shard.mapperService()).thenReturn(mock(MapperService.class));
790794

791795
UpdateHelper updateHelper = mock(UpdateHelper.class);
792796
when(updateHelper.prepare(any(), eq(shard), any())).thenReturn(

server/src/test/java/org/elasticsearch/index/mapper/DynamicMappingIT.java

+43
Original file line numberDiff line numberDiff line change
@@ -21,8 +21,14 @@
2121
import org.elasticsearch.action.DocWriteResponse;
2222
import org.elasticsearch.action.admin.indices.mapping.get.GetMappingsResponse;
2323
import org.elasticsearch.action.bulk.BulkResponse;
24+
import org.elasticsearch.action.index.IndexRequestBuilder;
25+
import org.elasticsearch.cluster.ClusterState;
26+
import org.elasticsearch.cluster.ClusterStateUpdateTask;
2427
import org.elasticsearch.cluster.metadata.MappingMetaData;
2528
import org.elasticsearch.common.collect.ImmutableOpenMap;
29+
import org.elasticsearch.cluster.service.ClusterService;
30+
import org.elasticsearch.common.settings.Settings;
31+
import org.elasticsearch.common.unit.TimeValue;
2632
import org.elasticsearch.plugins.Plugin;
2733
import org.elasticsearch.test.ESIntegTestCase;
2834
import org.elasticsearch.test.InternalSettingsPlugin;
@@ -35,6 +41,8 @@
3541
import java.util.concurrent.CountDownLatch;
3642
import java.util.concurrent.atomic.AtomicReference;
3743

44+
import static org.elasticsearch.index.mapper.MapperService.INDEX_MAPPING_TOTAL_FIELDS_LIMIT_SETTING;
45+
3846
public class DynamicMappingIT extends ESIntegTestCase {
3947

4048
@Override
@@ -119,4 +127,39 @@ public void run() {
119127
assertTrue(client().prepareGet("index", "type", Integer.toString(i)).get().isExists());
120128
}
121129
}
130+
131+
public void testPreflightCheckAvoidsMaster() throws InterruptedException {
132+
createIndex("index", Settings.builder().put(INDEX_MAPPING_TOTAL_FIELDS_LIMIT_SETTING.getKey(), 2).build());
133+
ensureGreen("index");
134+
client().prepareIndex("index", MapperService.SINGLE_MAPPING_NAME).setId("1").setSource("field1", "value1").get();
135+
136+
final CountDownLatch masterBlockedLatch = new CountDownLatch(1);
137+
final CountDownLatch indexingCompletedLatch = new CountDownLatch(1);
138+
139+
internalCluster().getInstance(ClusterService.class, internalCluster().getMasterName()).submitStateUpdateTask("block-state-updates",
140+
new ClusterStateUpdateTask() {
141+
@Override
142+
public ClusterState execute(ClusterState currentState) throws Exception {
143+
masterBlockedLatch.countDown();
144+
indexingCompletedLatch.await();
145+
return currentState;
146+
}
147+
148+
@Override
149+
public void onFailure(String source, Exception e) {
150+
throw new AssertionError("unexpected", e);
151+
}
152+
});
153+
154+
masterBlockedLatch.await();
155+
final IndexRequestBuilder indexRequestBuilder
156+
= client().prepareIndex("index", MapperService.SINGLE_MAPPING_NAME).setId("2").setSource("field2", "value2");
157+
try {
158+
assertThat(
159+
expectThrows(IllegalArgumentException.class, () -> indexRequestBuilder.get(TimeValue.timeValueSeconds(10))).getMessage(),
160+
Matchers.containsString("Limit of total fields [2] in index [index] has been exceeded"));
161+
} finally {
162+
indexingCompletedLatch.countDown();
163+
}
164+
}
122165
}

server/src/test/java/org/elasticsearch/index/mapper/MapperServiceTests.java

+23-8
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,8 @@
5757

5858
import static org.hamcrest.CoreMatchers.containsString;
5959
import static org.hamcrest.Matchers.instanceOf;
60+
import static org.hamcrest.Matchers.notNullValue;
61+
import static org.hamcrest.Matchers.nullValue;
6062

6163
public class MapperServiceTests extends ESSingleNodeTestCase {
6264

@@ -127,6 +129,15 @@ public void testIndexIntoDefaultMapping() throws Throwable {
127129
assertNull(indexService.mapperService().documentMapper(MapperService.DEFAULT_MAPPING));
128130
}
129131

132+
public void testPreflightUpdateDoesNotChangeMapping() throws Throwable {
133+
final MapperService mapperService = createIndex("test1").mapperService();
134+
final CompressedXContent mapping = createMappingSpecifyingNumberOfFields(1);
135+
mapperService.merge("type", mapping, MergeReason.MAPPING_UPDATE_PREFLIGHT);
136+
assertThat("field was not created by preflight check", mapperService.fullName("field0"), nullValue());
137+
mapperService.merge("type", mapping, MergeReason.MAPPING_UPDATE);
138+
assertThat("field was not created by mapping update", mapperService.fullName("field0"), notNullValue());
139+
}
140+
130141
/**
131142
* Test that we can have at least the number of fields in new mappings that are defined by "index.mapping.total_fields.limit".
132143
* Any additional field should trigger an IllegalArgumentException.
@@ -141,7 +152,7 @@ public void testTotalFieldsLimit() throws Throwable {
141152
// adding one more field should trigger exception
142153
IllegalArgumentException e = expectThrows(IllegalArgumentException.class, () -> {
143154
createIndex("test2", settings).mapperService().merge("type",
144-
createMappingSpecifyingNumberOfFields(totalFieldsLimit + 1), MergeReason.MAPPING_UPDATE);
155+
createMappingSpecifyingNumberOfFields(totalFieldsLimit + 1), updateOrPreflight());
145156
});
146157
assertTrue(e.getMessage(),
147158
e.getMessage().contains("Limit of total fields [" + totalFieldsLimit + "] in index [test2] has been exceeded"));
@@ -177,7 +188,7 @@ public void testMappingDepthExceedsLimit() throws Throwable {
177188
indexService2.mapperService().merge("type", objectMapping, MergeReason.MAPPING_UPDATE);
178189

179190
IllegalArgumentException e = expectThrows(IllegalArgumentException.class,
180-
() -> indexService1.mapperService().merge("type", objectMapping, MergeReason.MAPPING_UPDATE));
191+
() -> indexService1.mapperService().merge("type", objectMapping, updateOrPreflight()));
181192
assertThat(e.getMessage(), containsString("Limit of mapping depth [1] in index [test1] has been exceeded"));
182193
}
183194

@@ -228,7 +239,7 @@ public void testIndexSortWithNestedFields() throws IOException {
228239
.endObject().endObject()));
229240
invalidNestedException = expectThrows(IllegalArgumentException.class,
230241
() -> indexService.mapperService().merge("t", nestedFieldMapping,
231-
MergeReason.MAPPING_UPDATE));
242+
updateOrPreflight()));
232243
assertThat(invalidNestedException.getMessage(),
233244
containsString("cannot have nested fields when index sort is activated"));
234245
}
@@ -264,7 +275,7 @@ public void testFieldAliasWithMismatchedNestedScope() throws Throwable {
264275
.endObject()));
265276

266277
IllegalArgumentException e = expectThrows(IllegalArgumentException.class,
267-
() -> mapperService.merge("type", mappingUpdate, MergeReason.MAPPING_UPDATE));
278+
() -> mapperService.merge("type", mappingUpdate, updateOrPreflight()));
268279
assertThat(e.getMessage(), containsString("Invalid [path] value [nested.field] for field alias [alias]"));
269280
}
270281

@@ -292,7 +303,7 @@ public void testTotalFieldsLimitWithFieldAlias() throws Throwable {
292303
IllegalArgumentException e = expectThrows(IllegalArgumentException.class, () -> {
293304
createIndex("test2",
294305
Settings.builder().put(MapperService.INDEX_MAPPING_TOTAL_FIELDS_LIMIT_SETTING.getKey(), numberOfNonAliasFields).build())
295-
.mapperService().merge("type", new CompressedXContent(mapping), MergeReason.MAPPING_UPDATE);
306+
.mapperService().merge("type", new CompressedXContent(mapping), updateOrPreflight());
296307
});
297308
assertEquals("Limit of total fields [" + numberOfNonAliasFields + "] in index [test2] has been exceeded", e.getMessage());
298309
}
@@ -334,7 +345,7 @@ public void testFieldNameLengthLimit() throws Throwable {
334345
.endObject()));
335346

336347
IllegalArgumentException e = expectThrows(IllegalArgumentException.class, () -> {
337-
mapperService.merge("type", mappingUpdate, MergeReason.MAPPING_UPDATE);
348+
mapperService.merge("type", mappingUpdate, updateOrPreflight());
338349
});
339350

340351
assertEquals("Field name [" + testString + "] in index [test1] is too long. " +
@@ -359,7 +370,7 @@ public void testObjectNameLengthLimit() throws Throwable {
359370
.endObject().endObject()));
360371

361372
IllegalArgumentException e = expectThrows(IllegalArgumentException.class, () -> {
362-
mapperService.merge("type", mapping, MergeReason.MAPPING_UPDATE);
373+
mapperService.merge("type", mapping, updateOrPreflight());
363374
});
364375

365376
assertEquals("Field name [" + testString + "] in index [test1] is too long. " +
@@ -388,7 +399,7 @@ public void testAliasFieldNameLengthLimit() throws Throwable {
388399
.endObject().endObject()));
389400

390401
IllegalArgumentException e = expectThrows(IllegalArgumentException.class, () -> {
391-
mapperService.merge("type", mapping, MergeReason.MAPPING_UPDATE);
402+
mapperService.merge("type", mapping, updateOrPreflight());
392403
});
393404

394405
assertEquals("Field name [" + testString + "] in index [test1] is too long. " +
@@ -479,6 +490,10 @@ private boolean assertSameContainedFilters(TokenFilterFactory[] originalTokenFil
479490
return true;
480491
}
481492

493+
private static MergeReason updateOrPreflight() {
494+
return randomFrom(MergeReason.MAPPING_UPDATE, MergeReason.MAPPING_UPDATE_PREFLIGHT);
495+
}
496+
482497
public static final class ReloadableFilterPlugin extends Plugin implements AnalysisPlugin {
483498

484499
@Override

0 commit comments

Comments
 (0)