Skip to content

Commit bfbbd73

Browse files
authored
Block too many concurrent mapping updates (#51038)
Ensures that there are not too many concurrent dynamic mapping updates going out from the data nodes to the master. Closes #50670
1 parent 999884d commit bfbbd73

File tree

5 files changed

+188
-0
lines changed

5 files changed

+188
-0
lines changed

server/src/main/java/org/elasticsearch/cluster/action/index/MappingUpdatedAction.java

+66
Original file line numberDiff line numberDiff line change
@@ -30,10 +30,13 @@
3030
import org.elasticsearch.common.settings.Setting.Property;
3131
import org.elasticsearch.common.settings.Settings;
3232
import org.elasticsearch.common.unit.TimeValue;
33+
import org.elasticsearch.common.util.concurrent.RunOnce;
3334
import org.elasticsearch.common.xcontent.XContentType;
3435
import org.elasticsearch.index.Index;
3536
import org.elasticsearch.index.mapper.Mapping;
3637

38+
import java.util.concurrent.Semaphore;
39+
3740
/**
3841
* Called by shards in the cluster when their mapping was dynamically updated and it needs to be updated
3942
* in the cluster state meta data (and broadcast to all members).
@@ -44,19 +47,30 @@ public class MappingUpdatedAction {
4447
Setting.positiveTimeSetting("indices.mapping.dynamic_timeout", TimeValue.timeValueSeconds(30),
4548
Property.Dynamic, Property.NodeScope);
4649

50+
public static final Setting<Integer> INDICES_MAX_IN_FLIGHT_UPDATES_SETTING =
51+
Setting.intSetting("indices.mapping.max_in_flight_updates", 10, 1, 1000,
52+
Property.Dynamic, Property.NodeScope);
53+
4754
private IndicesAdminClient client;
4855
private volatile TimeValue dynamicMappingUpdateTimeout;
56+
private final AdjustableSemaphore semaphore;
4957

5058
@Inject
5159
public MappingUpdatedAction(Settings settings, ClusterSettings clusterSettings) {
5260
this.dynamicMappingUpdateTimeout = INDICES_MAPPING_DYNAMIC_TIMEOUT_SETTING.get(settings);
61+
this.semaphore = new AdjustableSemaphore(INDICES_MAX_IN_FLIGHT_UPDATES_SETTING.get(settings), true);
5362
clusterSettings.addSettingsUpdateConsumer(INDICES_MAPPING_DYNAMIC_TIMEOUT_SETTING, this::setDynamicMappingUpdateTimeout);
63+
clusterSettings.addSettingsUpdateConsumer(INDICES_MAX_IN_FLIGHT_UPDATES_SETTING, this::setMaxInFlightUpdates);
5464
}
5565

5666
private void setDynamicMappingUpdateTimeout(TimeValue dynamicMappingUpdateTimeout) {
5767
this.dynamicMappingUpdateTimeout = dynamicMappingUpdateTimeout;
5868
}
5969

70+
private void setMaxInFlightUpdates(int maxInFlightUpdates) {
71+
semaphore.setMaxPermits(maxInFlightUpdates);
72+
}
73+
6074
public void setClient(Client client) {
6175
this.client = client.admin().indices();
6276
}
@@ -68,6 +82,32 @@ public void setClient(Client client) {
6882
* potentially waiting for a master node to be available.
6983
*/
7084
public void updateMappingOnMaster(Index index, Mapping mappingUpdate, ActionListener<Void> listener) {
85+
final RunOnce release = new RunOnce(() -> semaphore.release());
86+
try {
87+
semaphore.acquire();
88+
} catch (InterruptedException e) {
89+
Thread.currentThread().interrupt();
90+
listener.onFailure(e);
91+
return;
92+
}
93+
boolean successFullySent = false;
94+
try {
95+
sendUpdateMapping(index, mappingUpdate, ActionListener.runBefore(listener, release::run));
96+
successFullySent = true;
97+
} finally {
98+
if (successFullySent == false) {
99+
release.run();
100+
}
101+
}
102+
}
103+
104+
// used by tests
105+
int blockedThreads() {
106+
return semaphore.getQueueLength();
107+
}
108+
109+
// can be overridden by tests
110+
protected void sendUpdateMapping(Index index, Mapping mappingUpdate, ActionListener<Void> listener) {
71111
client.preparePutMapping().setConcreteIndex(index).setSource(mappingUpdate.toString(), XContentType.JSON)
72112
.setMasterNodeTimeout(dynamicMappingUpdateTimeout).setTimeout(TimeValue.ZERO)
73113
.execute(new ActionListener<>() {
@@ -82,4 +122,30 @@ public void onFailure(Exception e) {
82122
}
83123
});
84124
}
125+
126+
static class AdjustableSemaphore extends Semaphore {
127+
128+
private final Object maxPermitsMutex = new Object();
129+
private int maxPermits;
130+
131+
AdjustableSemaphore(int maxPermits, boolean fair) {
132+
super(maxPermits, fair);
133+
this.maxPermits = maxPermits;
134+
}
135+
136+
void setMaxPermits(int permits) {
137+
synchronized (maxPermitsMutex) {
138+
final int diff = Math.subtractExact(permits, maxPermits);
139+
if (diff > 0) {
140+
// add permits
141+
release(diff);
142+
} else if (diff < 0) {
143+
// remove permits
144+
reducePermits(Math.negateExact(diff));
145+
}
146+
147+
maxPermits = permits;
148+
}
149+
}
150+
}
85151
}

server/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java

+1
Original file line numberDiff line numberDiff line change
@@ -201,6 +201,7 @@ public void apply(Settings value, Settings current, Settings previous) {
201201
IndicesService.INDICES_ID_FIELD_DATA_ENABLED_SETTING,
202202
IndicesService.WRITE_DANGLING_INDICES_INFO_SETTING,
203203
MappingUpdatedAction.INDICES_MAPPING_DYNAMIC_TIMEOUT_SETTING,
204+
MappingUpdatedAction.INDICES_MAX_IN_FLIGHT_UPDATES_SETTING,
204205
MetaData.SETTING_READ_ONLY_SETTING,
205206
MetaData.SETTING_READ_ONLY_ALLOW_DELETE_SETTING,
206207
MetaData.SETTING_CLUSTER_MAX_SHARDS_PER_NODE,
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,118 @@
1+
/*
2+
* Licensed to Elasticsearch under one or more contributor
3+
* license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright
5+
* ownership. Elasticsearch licenses this file to you under
6+
* the Apache License, Version 2.0 (the "License"); you may
7+
* not use this file except in compliance with the License.
8+
* You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
package org.elasticsearch.cluster.action.index;
20+
21+
import org.elasticsearch.action.ActionListener;
22+
import org.elasticsearch.action.support.PlainActionFuture;
23+
import org.elasticsearch.cluster.action.index.MappingUpdatedAction.AdjustableSemaphore;
24+
import org.elasticsearch.common.settings.ClusterSettings;
25+
import org.elasticsearch.common.settings.Settings;
26+
import org.elasticsearch.index.Index;
27+
import org.elasticsearch.index.mapper.Mapping;
28+
import org.elasticsearch.test.ESTestCase;
29+
30+
import java.util.List;
31+
import java.util.concurrent.CopyOnWriteArrayList;
32+
33+
public class MappingUpdatedActionTests extends ESTestCase {
34+
35+
public void testAdjustableSemaphore() {
36+
AdjustableSemaphore sem = new AdjustableSemaphore(1, randomBoolean());
37+
assertEquals(1, sem.availablePermits());
38+
assertTrue(sem.tryAcquire());
39+
assertEquals(0, sem.availablePermits());
40+
assertFalse(sem.tryAcquire());
41+
assertEquals(0, sem.availablePermits());
42+
43+
// increase the number of max permits to 2
44+
sem.setMaxPermits(2);
45+
assertEquals(1, sem.availablePermits());
46+
assertTrue(sem.tryAcquire());
47+
assertEquals(0, sem.availablePermits());
48+
49+
// release all current permits
50+
sem.release();
51+
assertEquals(1, sem.availablePermits());
52+
sem.release();
53+
assertEquals(2, sem.availablePermits());
54+
55+
// reduce number of max permits to 1
56+
sem.setMaxPermits(1);
57+
assertEquals(1, sem.availablePermits());
58+
// set back to 2
59+
sem.setMaxPermits(2);
60+
assertEquals(2, sem.availablePermits());
61+
62+
// take both permits and reduce max permits
63+
assertTrue(sem.tryAcquire());
64+
assertTrue(sem.tryAcquire());
65+
assertEquals(0, sem.availablePermits());
66+
assertFalse(sem.tryAcquire());
67+
sem.setMaxPermits(1);
68+
assertEquals(-1, sem.availablePermits());
69+
assertFalse(sem.tryAcquire());
70+
71+
// release one permit
72+
sem.release();
73+
assertEquals(0, sem.availablePermits());
74+
assertFalse(sem.tryAcquire());
75+
76+
// release second permit
77+
sem.release();
78+
assertEquals(1, sem.availablePermits());
79+
assertTrue(sem.tryAcquire());
80+
}
81+
82+
public void testMappingUpdatedActionBlocks() throws Exception {
83+
List<ActionListener<Void>> inFlightListeners = new CopyOnWriteArrayList<>();
84+
final MappingUpdatedAction mua = new MappingUpdatedAction(Settings.builder()
85+
.put(MappingUpdatedAction.INDICES_MAX_IN_FLIGHT_UPDATES_SETTING.getKey(), 1).build(),
86+
new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS)) {
87+
88+
@Override
89+
protected void sendUpdateMapping(Index index, Mapping mappingUpdate, ActionListener<Void> listener) {
90+
inFlightListeners.add(listener);
91+
}
92+
};
93+
94+
PlainActionFuture<Void> fut1 = new PlainActionFuture<>();
95+
mua.updateMappingOnMaster(null, null, fut1);
96+
assertEquals(1, inFlightListeners.size());
97+
assertEquals(0, mua.blockedThreads());
98+
99+
PlainActionFuture<Void> fut2 = new PlainActionFuture<>();
100+
Thread thread = new Thread(() -> {
101+
mua.updateMappingOnMaster(null, null, fut2); // blocked
102+
});
103+
thread.start();
104+
assertBusy(() -> assertEquals(1, mua.blockedThreads()));
105+
106+
assertEquals(1, inFlightListeners.size());
107+
assertFalse(fut1.isDone());
108+
inFlightListeners.remove(0).onResponse(null);
109+
assertTrue(fut1.isDone());
110+
111+
thread.join();
112+
assertEquals(0, mua.blockedThreads());
113+
assertEquals(1, inFlightListeners.size());
114+
assertFalse(fut2.isDone());
115+
inFlightListeners.remove(0).onResponse(null);
116+
assertTrue(fut2.isDone());
117+
}
118+
}

server/src/test/java/org/elasticsearch/snapshots/SnapshotResiliencyTests.java

+1
Original file line numberDiff line numberDiff line change
@@ -885,6 +885,7 @@ private Environment createEnvironment(String nodeName) {
885885
.put(Environment.PATH_REPO_SETTING.getKey(), tempDir.resolve("repo").toAbsolutePath())
886886
.putList(ClusterBootstrapService.INITIAL_MASTER_NODES_SETTING.getKey(),
887887
ClusterBootstrapService.INITIAL_MASTER_NODES_SETTING.get(Settings.EMPTY))
888+
.put(MappingUpdatedAction.INDICES_MAX_IN_FLIGHT_UPDATES_SETTING.getKey(), 1000) // o.w. some tests might block
888889
.build());
889890
}
890891

test/framework/src/main/java/org/elasticsearch/test/InternalTestCluster.java

+2
Original file line numberDiff line numberDiff line change
@@ -471,6 +471,8 @@ private static Settings getRandomNodeSettings(long seed) {
471471
if (random.nextBoolean()) {
472472
builder.put(MappingUpdatedAction.INDICES_MAPPING_DYNAMIC_TIMEOUT_SETTING.getKey(),
473473
timeValueSeconds(RandomNumbers.randomIntBetween(random, 10, 30)).getStringRep());
474+
builder.put(MappingUpdatedAction.INDICES_MAX_IN_FLIGHT_UPDATES_SETTING.getKey(),
475+
RandomNumbers.randomIntBetween(random, 1, 10));
474476
}
475477

476478
// turning on the real memory circuit breaker leads to spurious test failures. As have no full control over heap usage, we

0 commit comments

Comments
 (0)