Skip to content

Commit 5273410

Browse files
committed
Update mapping on master in async manner
Today, when a new mapping is introduced, the mapping is rebuilt (refreshSource) on the thread that performs the indexing request. This can become heavier and heavier if new mappings keeps on being introduced, we can move this process to another thread that will be responsible to refresh the source and then send the update mapping to the master (note, this doesn't change the semantics of new mapping introduction, since they are async anyhow). When doing so, the thread can also try and batch as much updates as possible, this is handy especially when multiple shards for the same index exists on the same node. An internal setting that can control the time to wait for batches is also added (defaults to 0). Testing wise, a new support method on ElasticsearchIntegrationTest#waitForConcreteMappingsOnAll to allow to wait for the concrete manifestation of mappings on all relevant nodes is added. Some tests mistakenly rely on the fact that there are no more pending tasks to mean mappings have been updated, so if we see, timing related, failures down later (all tests pass), then those will need to be fixed to wither awaitBusy on the master for the new mapping, or in the rare case, wait for the concrete mapping on all the nodes using the new method. closes #6648 allow to change the additional time window dynamically better sorting on mappers when refreshing source also, no need to call nodes info in test, we already have the node names clean calls to mapping update to provide doc mapper and UUID always also use the internal cluster support method to get the list of nodes an index is on reverse the order to pick the latest change first remove unused field and fix constructor param move to start/stop on mapping update action randomize INDICES_MAPPING_ADDITIONAL_MAPPING_CHANGE_TIME
1 parent 761ef5d commit 5273410

File tree

12 files changed

+315
-165
lines changed

12 files changed

+315
-165
lines changed

src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,9 @@
5353
import org.elasticsearch.index.engine.DocumentAlreadyExistsException;
5454
import org.elasticsearch.index.engine.Engine;
5555
import org.elasticsearch.index.engine.VersionConflictEngineException;
56+
import org.elasticsearch.index.mapper.DocumentMapper;
5657
import org.elasticsearch.index.mapper.SourceToParse;
58+
import org.elasticsearch.index.service.IndexService;
5759
import org.elasticsearch.index.shard.ShardId;
5860
import org.elasticsearch.index.shard.service.IndexShard;
5961
import org.elasticsearch.indices.IndicesService;
@@ -137,7 +139,8 @@ protected ShardIterator shards(ClusterState clusterState, BulkShardRequest reque
137139
@Override
138140
protected PrimaryResponse<BulkShardResponse, BulkShardRequest> shardOperationOnPrimary(ClusterState clusterState, PrimaryOperationRequest shardRequest) {
139141
final BulkShardRequest request = shardRequest.request;
140-
IndexShard indexShard = indicesService.indexServiceSafe(shardRequest.request.index()).shardSafe(shardRequest.shardId);
142+
IndexService indexService = indicesService.indexServiceSafe(shardRequest.request.index());
143+
IndexShard indexShard = indexService.shardSafe(shardRequest.shardId);
141144
Engine.IndexingOperation[] ops = null;
142145
final Set<Tuple<String, String>> mappingsToUpdate = Sets.newHashSet();
143146

@@ -180,7 +183,10 @@ protected PrimaryResponse<BulkShardResponse, BulkShardRequest> shardOperationOnP
180183
applyVersion(request.items()[j], preVersions[j], preVersionTypes[j]);
181184
}
182185
for (Tuple<String, String> mappingToUpdate : mappingsToUpdate) {
183-
mappingUpdatedAction.updateMappingOnMaster(mappingToUpdate.v1(), mappingToUpdate.v2(), true);
186+
DocumentMapper docMapper = indexService.mapperService().documentMapper(mappingToUpdate.v2());
187+
if (docMapper != null) {
188+
mappingUpdatedAction.updateMappingOnMaster(mappingToUpdate.v1(), docMapper, indexService.indexUUID());
189+
}
184190
}
185191
throw (ElasticsearchException) e;
186192
}
@@ -340,7 +346,10 @@ protected PrimaryResponse<BulkShardResponse, BulkShardRequest> shardOperationOnP
340346
}
341347

342348
for (Tuple<String, String> mappingToUpdate : mappingsToUpdate) {
343-
mappingUpdatedAction.updateMappingOnMaster(mappingToUpdate.v1(), mappingToUpdate.v2(), true);
349+
DocumentMapper docMapper = indexService.mapperService().documentMapper(mappingToUpdate.v2());
350+
if (docMapper != null) {
351+
mappingUpdatedAction.updateMappingOnMaster(mappingToUpdate.v1(), docMapper, indexService.indexUUID());
352+
}
344353
}
345354

346355
if (request.refresh()) {

src/main/java/org/elasticsearch/action/index/TransportIndexAction.java

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@
4141
import org.elasticsearch.common.settings.Settings;
4242
import org.elasticsearch.index.engine.Engine;
4343
import org.elasticsearch.index.mapper.SourceToParse;
44+
import org.elasticsearch.index.service.IndexService;
4445
import org.elasticsearch.index.shard.service.IndexShard;
4546
import org.elasticsearch.indices.IndexAlreadyExistsException;
4647
import org.elasticsearch.indices.IndicesService;
@@ -184,7 +185,8 @@ protected PrimaryResponse<IndexResponse, IndexRequest> shardOperationOnPrimary(C
184185
}
185186
}
186187

187-
IndexShard indexShard = indicesService.indexServiceSafe(shardRequest.request.index()).shardSafe(shardRequest.shardId);
188+
IndexService indexService = indicesService.indexServiceSafe(shardRequest.request.index());
189+
IndexShard indexShard = indexService.shardSafe(shardRequest.shardId);
188190
SourceToParse sourceToParse = SourceToParse.source(SourceToParse.Origin.PRIMARY, request.source()).type(request.type()).id(request.id())
189191
.routing(request.routing()).parent(request.parent()).timestamp(request.timestamp()).ttl(request.ttl());
190192
long version;
@@ -193,7 +195,7 @@ protected PrimaryResponse<IndexResponse, IndexRequest> shardOperationOnPrimary(C
193195
if (request.opType() == IndexRequest.OpType.INDEX) {
194196
Engine.Index index = indexShard.prepareIndex(sourceToParse, request.version(), request.versionType(), Engine.Operation.Origin.PRIMARY, request.canHaveDuplicates());
195197
if (index.parsedDoc().mappingsModified()) {
196-
mappingUpdatedAction.updateMappingOnMaster(request.index(), request.type(), indexMetaData.getUUID(), false);
198+
mappingUpdatedAction.updateMappingOnMaster(request.index(), index.docMapper(), indexService.indexUUID());
197199
}
198200
indexShard.index(index);
199201
version = index.version();
@@ -203,7 +205,7 @@ protected PrimaryResponse<IndexResponse, IndexRequest> shardOperationOnPrimary(C
203205
Engine.Create create = indexShard.prepareCreate(sourceToParse,
204206
request.version(), request.versionType(), Engine.Operation.Origin.PRIMARY, request.canHaveDuplicates(), request.autoGeneratedId());
205207
if (create.parsedDoc().mappingsModified()) {
206-
mappingUpdatedAction.updateMappingOnMaster(request.index(), request.type(), indexMetaData.getUUID(), false);
208+
mappingUpdatedAction.updateMappingOnMaster(request.index(), create.docMapper(), indexService.indexUUID());
207209
}
208210
indexShard.create(create);
209211
version = create.version();

src/main/java/org/elasticsearch/cluster/action/index/MappingUpdatedAction.java

Lines changed: 141 additions & 58 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,8 @@
1919

2020
package org.elasticsearch.cluster.action.index;
2121

22+
import com.google.common.collect.Lists;
23+
import com.google.common.collect.Sets;
2224
import org.elasticsearch.ElasticsearchException;
2325
import org.elasticsearch.action.ActionListener;
2426
import org.elasticsearch.action.ActionRequestValidationException;
@@ -31,19 +33,25 @@
3133
import org.elasticsearch.cluster.metadata.IndexMetaData;
3234
import org.elasticsearch.cluster.metadata.MetaDataMappingService;
3335
import org.elasticsearch.cluster.node.DiscoveryNode;
36+
import org.elasticsearch.common.collect.Tuple;
3437
import org.elasticsearch.common.compress.CompressedString;
3538
import org.elasticsearch.common.inject.Inject;
3639
import org.elasticsearch.common.io.stream.StreamInput;
3740
import org.elasticsearch.common.io.stream.StreamOutput;
3841
import org.elasticsearch.common.settings.Settings;
42+
import org.elasticsearch.common.unit.TimeValue;
43+
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
44+
import org.elasticsearch.common.util.concurrent.EsExecutors;
3945
import org.elasticsearch.index.mapper.DocumentMapper;
40-
import org.elasticsearch.index.mapper.MapperService;
41-
import org.elasticsearch.indices.IndicesService;
46+
import org.elasticsearch.node.settings.NodeSettingsService;
4247
import org.elasticsearch.threadpool.ThreadPool;
4348
import org.elasticsearch.transport.TransportService;
4449

4550
import java.io.IOException;
46-
import java.util.concurrent.CountDownLatch;
51+
import java.util.Collections;
52+
import java.util.List;
53+
import java.util.Set;
54+
import java.util.concurrent.BlockingQueue;
4755
import java.util.concurrent.TimeUnit;
4856
import java.util.concurrent.atomic.AtomicLong;
4957

@@ -53,75 +61,49 @@
5361
*/
5462
public class MappingUpdatedAction extends TransportMasterNodeOperationAction<MappingUpdatedAction.MappingUpdatedRequest, MappingUpdatedAction.MappingUpdatedResponse> {
5563

64+
public static final String INDICES_MAPPING_ADDITIONAL_MAPPING_CHANGE_TIME = "indices.mapping.additional_mapping_change_time";
65+
5666
private final AtomicLong mappingUpdateOrderGen = new AtomicLong();
5767
private final MetaDataMappingService metaDataMappingService;
58-
private final IndicesService indicesService;
5968

60-
private final boolean waitForMappingChange;
69+
private volatile MasterMappingUpdater masterMappingUpdater;
70+
71+
private volatile TimeValue additionalMappingChangeTime;
72+
73+
class ApplySettings implements NodeSettingsService.Listener {
74+
@Override
75+
public void onRefreshSettings(Settings settings) {
76+
final TimeValue current = MappingUpdatedAction.this.additionalMappingChangeTime;
77+
final TimeValue newValue = settings.getAsTime(INDICES_MAPPING_ADDITIONAL_MAPPING_CHANGE_TIME, current);
78+
if (!current.equals(newValue)) {
79+
logger.info("updating " + INDICES_MAPPING_ADDITIONAL_MAPPING_CHANGE_TIME + " from [{}] to [{}]", current, newValue);
80+
MappingUpdatedAction.this.additionalMappingChangeTime = newValue;
81+
}
82+
}
83+
}
6184

6285
@Inject
6386
public MappingUpdatedAction(Settings settings, TransportService transportService, ClusterService clusterService, ThreadPool threadPool,
64-
MetaDataMappingService metaDataMappingService, IndicesService indicesService) {
87+
MetaDataMappingService metaDataMappingService, NodeSettingsService nodeSettingsService) {
6588
super(settings, transportService, clusterService, threadPool);
6689
this.metaDataMappingService = metaDataMappingService;
67-
this.indicesService = indicesService;
68-
this.waitForMappingChange = settings.getAsBoolean("action.wait_on_mapping_change", false);
90+
// this setting should probably always be 0, just add the option to wait for more changes within a time window
91+
this.additionalMappingChangeTime = settings.getAsTime(INDICES_MAPPING_ADDITIONAL_MAPPING_CHANGE_TIME, TimeValue.timeValueMillis(0));
92+
nodeSettingsService.addListener(new ApplySettings());
6993
}
7094

71-
public void updateMappingOnMaster(String index, String type, boolean neverWaitForMappingChange) {
72-
IndexMetaData metaData = clusterService.state().metaData().index(index);
73-
if (metaData != null) {
74-
updateMappingOnMaster(index, type, metaData.getUUID(), neverWaitForMappingChange);
75-
}
95+
public void start() {
96+
this.masterMappingUpdater = new MasterMappingUpdater(EsExecutors.threadName(settings, "master_mapping_updater"));
97+
this.masterMappingUpdater.start();
7698
}
7799

78-
public void updateMappingOnMaster(String index, String type, String indexUUID, boolean neverWaitForMappingChange) {
79-
final MapperService mapperService = indicesService.indexServiceSafe(index).mapperService();
80-
final DocumentMapper documentMapper = mapperService.documentMapper(type);
81-
if (documentMapper != null) { // should not happen
82-
updateMappingOnMaster(documentMapper, index, type, indexUUID, neverWaitForMappingChange);
83-
}
100+
public void stop() {
101+
this.masterMappingUpdater.close();
102+
this.masterMappingUpdater = null;
84103
}
85104

86-
public void updateMappingOnMaster(DocumentMapper documentMapper, String index, String type, String indexUUID, boolean neverWaitForMappingChange) {
87-
final CountDownLatch latch = new CountDownLatch(1);
88-
final MappingUpdatedAction.MappingUpdatedRequest mappingRequest;
89-
try {
90-
// we generate the order id before we get the mapping to send and refresh the source, so
91-
// if 2 happen concurrently, we know that the later order will include the previous one
92-
long orderId = mappingUpdateOrderGen.incrementAndGet();
93-
documentMapper.refreshSource();
94-
DiscoveryNode node = clusterService.localNode();
95-
mappingRequest = new MappingUpdatedAction.MappingUpdatedRequest(
96-
index, indexUUID, type, documentMapper.mappingSource(), orderId, node != null ? node.id() : null
97-
);
98-
} catch (Throwable t) {
99-
logger.warn("Failed to update master on updated mapping for index [" + index + "], type [" + type + "]", t);
100-
latch.countDown();
101-
throw t;
102-
}
103-
logger.trace("Sending mapping updated to master: {}", mappingRequest);
104-
execute(mappingRequest, new ActionListener<MappingUpdatedAction.MappingUpdatedResponse>() {
105-
@Override
106-
public void onResponse(MappingUpdatedAction.MappingUpdatedResponse mappingUpdatedResponse) {
107-
// all is well
108-
latch.countDown();
109-
logger.debug("Successfully updated master with mapping update: {}", mappingRequest);
110-
}
111-
112-
@Override
113-
public void onFailure(Throwable e) {
114-
latch.countDown();
115-
logger.warn("Failed to update master on updated mapping for {}", e, mappingRequest);
116-
}
117-
});
118-
if (waitForMappingChange && !neverWaitForMappingChange) {
119-
try {
120-
latch.await(5, TimeUnit.SECONDS);
121-
} catch (InterruptedException e) {
122-
Thread.currentThread().interrupt();
123-
}
124-
}
105+
public void updateMappingOnMaster(String index, DocumentMapper documentMapper, String indexUUID) {
106+
masterMappingUpdater.add(new MappingChange(documentMapper, index, indexUUID));
125107
}
126108

127109
@Override
@@ -256,4 +238,105 @@ public String toString() {
256238
return "index [" + index + "], indexUUID [" + indexUUID + "], type [" + type + "] and source [" + mappingSource + "]";
257239
}
258240
}
241+
242+
private static class MappingChange {
243+
public final DocumentMapper documentMapper;
244+
public final String index;
245+
public final String indexUUID;
246+
247+
MappingChange(DocumentMapper documentMapper, String index, String indexUUID) {
248+
this.documentMapper = documentMapper;
249+
this.index = index;
250+
this.indexUUID = indexUUID;
251+
}
252+
}
253+
254+
/**
255+
* The master mapping updater removes the overhead of refreshing the mapping (refreshSource) on the
256+
* indexing thread.
257+
* <p/>
258+
* It also allows to reduce multiple mapping updates on the same index(UUID) and type into one update
259+
* (refreshSource + sending to master), which allows to offload the number of times mappings are updated
260+
* and sent to master for heavy single index requests that each introduce a new mapping, and when
261+
* multiple shards exists on the same nodes, allowing to work on the index level in this case.
262+
*/
263+
private class MasterMappingUpdater extends Thread {
264+
265+
private volatile boolean running = true;
266+
private final BlockingQueue<MappingChange> queue = ConcurrentCollections.newBlockingQueue();
267+
268+
public MasterMappingUpdater(String name) {
269+
super(name);
270+
}
271+
272+
public void add(MappingChange change) {
273+
queue.add(change);
274+
}
275+
276+
public void close() {
277+
running = false;
278+
this.interrupt();
279+
}
280+
281+
@Override
282+
public void run() {
283+
while (running) {
284+
try {
285+
MappingChange polledChange = queue.poll(10, TimeUnit.MINUTES);
286+
if (polledChange == null) {
287+
continue;
288+
}
289+
List<MappingChange> changes = Lists.newArrayList(polledChange);
290+
if (additionalMappingChangeTime.millis() > 0) {
291+
Thread.sleep(additionalMappingChangeTime.millis());
292+
}
293+
queue.drainTo(changes);
294+
Collections.reverse(changes); // process then in newest one to oldest
295+
Set<Tuple<String, String>> seenIndexAndTypes = Sets.newHashSet();
296+
for (MappingChange change : changes) {
297+
Tuple<String, String> checked = Tuple.tuple(change.indexUUID, change.documentMapper.type());
298+
if (seenIndexAndTypes.contains(checked)) {
299+
continue;
300+
}
301+
seenIndexAndTypes.add(checked);
302+
303+
final MappingUpdatedAction.MappingUpdatedRequest mappingRequest;
304+
try {
305+
// we generate the order id before we get the mapping to send and refresh the source, so
306+
// if 2 happen concurrently, we know that the later order will include the previous one
307+
long orderId = mappingUpdateOrderGen.incrementAndGet();
308+
change.documentMapper.refreshSource();
309+
DiscoveryNode node = clusterService.localNode();
310+
mappingRequest = new MappingUpdatedAction.MappingUpdatedRequest(
311+
change.index, change.indexUUID, change.documentMapper.type(), change.documentMapper.mappingSource(), orderId, node != null ? node.id() : null
312+
);
313+
} catch (Throwable t) {
314+
logger.warn("Failed to update master on updated mapping for index [" + change.index + "], type [" + change.documentMapper.type() + "]", t);
315+
continue;
316+
}
317+
logger.trace("sending mapping updated to master: {}", mappingRequest);
318+
execute(mappingRequest, new ActionListener<MappingUpdatedAction.MappingUpdatedResponse>() {
319+
@Override
320+
public void onResponse(MappingUpdatedAction.MappingUpdatedResponse mappingUpdatedResponse) {
321+
logger.debug("successfully updated master with mapping update: {}", mappingRequest);
322+
}
323+
324+
@Override
325+
public void onFailure(Throwable e) {
326+
logger.warn("failed to update master on updated mapping for {}", e, mappingRequest);
327+
}
328+
});
329+
330+
}
331+
} catch (InterruptedException e) {
332+
// are we shutting down? continue and check
333+
if (running) {
334+
logger.warn("failed to process mapping updates", e);
335+
}
336+
} catch (Throwable t) {
337+
logger.warn("failed to process mapping updates", t);
338+
}
339+
}
340+
}
341+
}
259342
}

src/main/java/org/elasticsearch/cluster/settings/ClusterDynamicSettingsModule.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121

2222
import org.elasticsearch.action.support.DestructiveOperations;
2323
import org.elasticsearch.cluster.InternalClusterInfoService;
24+
import org.elasticsearch.cluster.action.index.MappingUpdatedAction;
2425
import org.elasticsearch.cluster.metadata.MetaData;
2526
import org.elasticsearch.cluster.routing.allocation.allocator.BalancedShardsAllocator;
2627
import org.elasticsearch.cluster.routing.allocation.decider.*;
@@ -62,6 +63,7 @@ public ClusterDynamicSettingsModule() {
6263
clusterDynamicSettings.addDynamicSetting(IndicesStore.INDICES_STORE_THROTTLE_TYPE);
6364
clusterDynamicSettings.addDynamicSetting(IndicesStore.INDICES_STORE_THROTTLE_MAX_BYTES_PER_SEC, Validator.BYTES_SIZE);
6465
clusterDynamicSettings.addDynamicSetting(IndicesTTLService.INDICES_TTL_INTERVAL, Validator.TIME);
66+
clusterDynamicSettings.addDynamicSetting(MappingUpdatedAction.INDICES_MAPPING_ADDITIONAL_MAPPING_CHANGE_TIME, Validator.TIME);
6567
clusterDynamicSettings.addDynamicSetting(MetaData.SETTING_READ_ONLY);
6668
clusterDynamicSettings.addDynamicSetting(RecoverySettings.INDICES_RECOVERY_FILE_CHUNK_SIZE, Validator.BYTES_SIZE);
6769
clusterDynamicSettings.addDynamicSetting(RecoverySettings.INDICES_RECOVERY_TRANSLOG_OPS, Validator.INTEGER);

0 commit comments

Comments
 (0)