Skip to content

Block too many concurrent mapping updates #51038

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Jan 15, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,13 @@
import org.elasticsearch.common.settings.Setting.Property;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.RunOnce;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.mapper.Mapping;

import java.util.concurrent.Semaphore;

/**
* Called by shards in the cluster when their mapping was dynamically updated and it needs to be updated
* in the cluster state meta data (and broadcast to all members).
Expand All @@ -44,19 +47,30 @@ public class MappingUpdatedAction {
Setting.positiveTimeSetting("indices.mapping.dynamic_timeout", TimeValue.timeValueSeconds(30),
Property.Dynamic, Property.NodeScope);

public static final Setting<Integer> INDICES_MAX_IN_FLIGHT_UPDATES_SETTING =
Setting.intSetting("indices.mapping.max_in_flight_updates", 10, 1, 1000,
Property.Dynamic, Property.NodeScope);

private IndicesAdminClient client;
private volatile TimeValue dynamicMappingUpdateTimeout;
private final AdjustableSemaphore semaphore;

@Inject
public MappingUpdatedAction(Settings settings, ClusterSettings clusterSettings) {
this.dynamicMappingUpdateTimeout = INDICES_MAPPING_DYNAMIC_TIMEOUT_SETTING.get(settings);
this.semaphore = new AdjustableSemaphore(INDICES_MAX_IN_FLIGHT_UPDATES_SETTING.get(settings), true);
clusterSettings.addSettingsUpdateConsumer(INDICES_MAPPING_DYNAMIC_TIMEOUT_SETTING, this::setDynamicMappingUpdateTimeout);
clusterSettings.addSettingsUpdateConsumer(INDICES_MAX_IN_FLIGHT_UPDATES_SETTING, this::setMaxInFlightUpdates);
}

private void setDynamicMappingUpdateTimeout(TimeValue dynamicMappingUpdateTimeout) {
this.dynamicMappingUpdateTimeout = dynamicMappingUpdateTimeout;
}

private void setMaxInFlightUpdates(int maxInFlightUpdates) {
semaphore.setMaxPermits(maxInFlightUpdates);
}

public void setClient(Client client) {
this.client = client.admin().indices();
}
Expand All @@ -68,6 +82,32 @@ public void setClient(Client client) {
* potentially waiting for a master node to be available.
*/
public void updateMappingOnMaster(Index index, Mapping mappingUpdate, ActionListener<Void> listener) {
final RunOnce release = new RunOnce(() -> semaphore.release());
try {
semaphore.acquire();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
listener.onFailure(e);
return;
}
boolean successFullySent = false;
try {
sendUpdateMapping(index, mappingUpdate, ActionListener.runBefore(listener, release::run));
successFullySent = true;
} finally {
if (successFullySent == false) {
release.run();
}
}
}

// used by tests
int blockedThreads() {
return semaphore.getQueueLength();
}

// can be overridden by tests
protected void sendUpdateMapping(Index index, Mapping mappingUpdate, ActionListener<Void> listener) {
client.preparePutMapping().setConcreteIndex(index).setSource(mappingUpdate.toString(), XContentType.JSON)
.setMasterNodeTimeout(dynamicMappingUpdateTimeout).setTimeout(TimeValue.ZERO)
.execute(new ActionListener<>() {
Expand All @@ -82,4 +122,30 @@ public void onFailure(Exception e) {
}
});
}

static class AdjustableSemaphore extends Semaphore {

private final Object maxPermitsMutex = new Object();
private int maxPermits;

AdjustableSemaphore(int maxPermits, boolean fair) {
super(maxPermits, fair);
this.maxPermits = maxPermits;
}

void setMaxPermits(int permits) {
synchronized (maxPermitsMutex) {
final int diff = Math.subtractExact(permits, maxPermits);
if (diff > 0) {
// add permits
release(diff);
} else if (diff < 0) {
// remove permits
reducePermits(Math.negateExact(diff));
}

maxPermits = permits;
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,7 @@ public void apply(Settings value, Settings current, Settings previous) {
IndicesService.INDICES_ID_FIELD_DATA_ENABLED_SETTING,
IndicesService.WRITE_DANGLING_INDICES_INFO_SETTING,
MappingUpdatedAction.INDICES_MAPPING_DYNAMIC_TIMEOUT_SETTING,
MappingUpdatedAction.INDICES_MAX_IN_FLIGHT_UPDATES_SETTING,
MetaData.SETTING_READ_ONLY_SETTING,
MetaData.SETTING_READ_ONLY_ALLOW_DELETE_SETTING,
MetaData.SETTING_CLUSTER_MAX_SHARDS_PER_NODE,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.cluster.action.index;

import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.support.PlainActionFuture;
import org.elasticsearch.cluster.action.index.MappingUpdatedAction.AdjustableSemaphore;
import org.elasticsearch.common.settings.ClusterSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.mapper.Mapping;
import org.elasticsearch.test.ESTestCase;

import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;

public class MappingUpdatedActionTests extends ESTestCase {

public void testAdjustableSemaphore() {
AdjustableSemaphore sem = new AdjustableSemaphore(1, randomBoolean());
assertEquals(1, sem.availablePermits());
assertTrue(sem.tryAcquire());
assertEquals(0, sem.availablePermits());
assertFalse(sem.tryAcquire());
assertEquals(0, sem.availablePermits());

// increase the number of max permits to 2
sem.setMaxPermits(2);
assertEquals(1, sem.availablePermits());
assertTrue(sem.tryAcquire());
assertEquals(0, sem.availablePermits());

// release all current permits
sem.release();
assertEquals(1, sem.availablePermits());
sem.release();
assertEquals(2, sem.availablePermits());

// reduce number of max permits to 1
sem.setMaxPermits(1);
assertEquals(1, sem.availablePermits());
// set back to 2
sem.setMaxPermits(2);
assertEquals(2, sem.availablePermits());

// take both permits and reduce max permits
assertTrue(sem.tryAcquire());
assertTrue(sem.tryAcquire());
assertEquals(0, sem.availablePermits());
assertFalse(sem.tryAcquire());
sem.setMaxPermits(1);
assertEquals(-1, sem.availablePermits());
assertFalse(sem.tryAcquire());

// release one permit
sem.release();
assertEquals(0, sem.availablePermits());
assertFalse(sem.tryAcquire());

// release second permit
sem.release();
assertEquals(1, sem.availablePermits());
assertTrue(sem.tryAcquire());
}

public void testMappingUpdatedActionBlocks() throws Exception {
List<ActionListener<Void>> inFlightListeners = new CopyOnWriteArrayList<>();
final MappingUpdatedAction mua = new MappingUpdatedAction(Settings.builder()
.put(MappingUpdatedAction.INDICES_MAX_IN_FLIGHT_UPDATES_SETTING.getKey(), 1).build(),
new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS)) {

@Override
protected void sendUpdateMapping(Index index, Mapping mappingUpdate, ActionListener<Void> listener) {
inFlightListeners.add(listener);
}
};

PlainActionFuture<Void> fut1 = new PlainActionFuture<>();
mua.updateMappingOnMaster(null, null, fut1);
assertEquals(1, inFlightListeners.size());
assertEquals(0, mua.blockedThreads());

PlainActionFuture<Void> fut2 = new PlainActionFuture<>();
Thread thread = new Thread(() -> {
mua.updateMappingOnMaster(null, null, fut2); // blocked
});
thread.start();
assertBusy(() -> assertEquals(1, mua.blockedThreads()));

assertEquals(1, inFlightListeners.size());
assertFalse(fut1.isDone());
inFlightListeners.remove(0).onResponse(null);
assertTrue(fut1.isDone());

thread.join();
assertEquals(0, mua.blockedThreads());
assertEquals(1, inFlightListeners.size());
assertFalse(fut2.isDone());
inFlightListeners.remove(0).onResponse(null);
assertTrue(fut2.isDone());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -885,6 +885,7 @@ private Environment createEnvironment(String nodeName) {
.put(Environment.PATH_REPO_SETTING.getKey(), tempDir.resolve("repo").toAbsolutePath())
.putList(ClusterBootstrapService.INITIAL_MASTER_NODES_SETTING.getKey(),
ClusterBootstrapService.INITIAL_MASTER_NODES_SETTING.get(Settings.EMPTY))
.put(MappingUpdatedAction.INDICES_MAX_IN_FLIGHT_UPDATES_SETTING.getKey(), 1000) // o.w. some tests might block
.build());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -471,6 +471,8 @@ private static Settings getRandomNodeSettings(long seed) {
if (random.nextBoolean()) {
builder.put(MappingUpdatedAction.INDICES_MAPPING_DYNAMIC_TIMEOUT_SETTING.getKey(),
timeValueSeconds(RandomNumbers.randomIntBetween(random, 10, 30)).getStringRep());
builder.put(MappingUpdatedAction.INDICES_MAX_IN_FLIGHT_UPDATES_SETTING.getKey(),
RandomNumbers.randomIntBetween(random, 1, 10));
}

// turning on the real memory circuit breaker leads to spurious test failures. As have no full control over heap usage, we
Expand Down