Skip to content

Commit af1e70c

Browse files
committed
Do not remove flood block from indices on nodes undergoing replacement
This commit enhances `DiskThresholdMonitor` so that indices that have a flood-stage block will not have the block removed while they reside on a node that is part of a "REPLACE"-type node shutdown. This prevents a situation where a node is blocked due to disk usage, then during the replacement the block is removed while shards are relocating to the target node, indexing occurs, and then the target runs out of space due to the additional documents. Relates to elastic#70338 and elastic#76247
1 parent 5aa200c commit af1e70c

File tree

2 files changed

+202
-3
lines changed

2 files changed

+202
-3
lines changed

server/src/main/java/org/elasticsearch/cluster/routing/allocation/DiskThresholdMonitor.java

Lines changed: 25 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import org.elasticsearch.cluster.DiskUsage;
2323
import org.elasticsearch.cluster.block.ClusterBlockLevel;
2424
import org.elasticsearch.cluster.metadata.IndexMetadata;
25+
import org.elasticsearch.cluster.metadata.SingleNodeShutdownMetadata;
2526
import org.elasticsearch.cluster.node.DiscoveryNode;
2627
import org.elasticsearch.cluster.routing.RerouteService;
2728
import org.elasticsearch.cluster.routing.RoutingNode;
@@ -35,6 +36,7 @@
3536
import org.elasticsearch.common.settings.Settings;
3637
import org.elasticsearch.common.unit.ByteSizeValue;
3738
import org.elasticsearch.common.util.set.Sets;
39+
import org.elasticsearch.index.Index;
3840

3941
import java.util.ArrayList;
4042
import java.util.HashSet;
@@ -46,6 +48,8 @@
4648
import java.util.function.LongSupplier;
4749
import java.util.function.Supplier;
4850
import java.util.stream.Collectors;
51+
import java.util.stream.Stream;
52+
import java.util.stream.StreamSupport;
4953

5054
/**
5155
* Listens for a node to go over the high watermark and kicks off an empty
@@ -301,10 +305,29 @@ public void onNewInfo(ClusterInfo info) {
301305
logger.trace("no reroute required");
302306
listener.onResponse(null);
303307
}
304-
final Set<String> indicesToAutoRelease = state.routingTable().indicesRouting().stream()
305-
.map(Map.Entry::getKey)
308+
309+
// Generate a map of node name to ID so we can use it to look up node replacement targets
310+
final Map<String, String> nodeNameToId = StreamSupport.stream(state.getRoutingNodes().spliterator(), false)
311+
.collect(Collectors.toMap(rn -> rn.node().getName(), RoutingNode::nodeId, (s1, s2) -> s2));
312+
313+
// Calculate both the source node id and the target node id of a "replace" type shutdown
314+
final Set<String> nodesIdsPartOfReplacement = state.metadata().nodeShutdowns().values().stream()
315+
.filter(meta -> meta.getType() == SingleNodeShutdownMetadata.Type.REPLACE)
316+
.flatMap(meta -> Stream.of(meta.getNodeId(), nodeNameToId.get(meta.getTargetNodeName())))
317+
.collect(Collectors.toSet());
318+
319+
// Generate a set of all the indices that exist on either the target or source of a node replacement
320+
final Set<String> indicesOnReplaceSourceOrTarget = nodesIdsPartOfReplacement.stream()
321+
.flatMap(nodeId -> state.getRoutingNodes().node(nodeId).copyShards().stream()
322+
.map(ShardRouting::index)
323+
.map(Index::getName))
324+
.collect(Collectors.toSet());
325+
326+
final Set<String> indicesToAutoRelease = state.routingTable().indicesRouting().keySet().stream()
306327
.filter(index -> indicesNotToAutoRelease.contains(index) == false)
307328
.filter(index -> state.getBlocks().hasIndexBlock(index, IndexMetadata.INDEX_READ_ONLY_ALLOW_DELETE_BLOCK))
329+
// Do not auto release indices that are on either the source or the target of a node replacement
330+
.filter(index -> indicesOnReplaceSourceOrTarget.contains(index) == false)
308331
.collect(Collectors.toSet());
309332

310333
if (indicesToAutoRelease.isEmpty() == false) {

server/src/test/java/org/elasticsearch/cluster/routing/allocation/DiskThresholdMonitorTests.java

Lines changed: 177 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,8 @@
2121
import org.elasticsearch.cluster.block.ClusterBlocks;
2222
import org.elasticsearch.cluster.metadata.IndexMetadata;
2323
import org.elasticsearch.cluster.metadata.Metadata;
24+
import org.elasticsearch.cluster.metadata.NodesShutdownMetadata;
25+
import org.elasticsearch.cluster.metadata.SingleNodeShutdownMetadata;
2426
import org.elasticsearch.cluster.node.DiscoveryNode;
2527
import org.elasticsearch.cluster.node.DiscoveryNodeRole;
2628
import org.elasticsearch.cluster.node.DiscoveryNodes;
@@ -411,6 +413,180 @@ protected void updateIndicesReadOnly(Set<String> indicesToUpdate, ActionListener
411413
assertNull(indicesToRelease.get());
412414
}
413415

416+
public void testNoAutoReleaseOfIndicesOnReplacementNodes() {
417+
AtomicReference<Set<String>> indicesToMarkReadOnly = new AtomicReference<>();
418+
AtomicReference<Set<String>> indicesToRelease = new AtomicReference<>();
419+
AllocationService allocation = createAllocationService(Settings.builder()
420+
.put("cluster.routing.allocation.node_concurrent_recoveries", 10).build());
421+
Metadata metadata = Metadata.builder()
422+
.put(IndexMetadata.builder("test_1").settings(settings(Version.CURRENT)).numberOfShards(2).numberOfReplicas(1))
423+
.put(IndexMetadata.builder("test_2").settings(settings(Version.CURRENT)).numberOfShards(2).numberOfReplicas(1))
424+
.build();
425+
RoutingTable routingTable = RoutingTable.builder()
426+
.addAsNew(metadata.index("test_1"))
427+
.addAsNew(metadata.index("test_2"))
428+
.build();
429+
final ClusterState clusterState = applyStartedShardsUntilNoChange(
430+
ClusterState.builder(ClusterName.CLUSTER_NAME_SETTING.getDefault(Settings.EMPTY))
431+
.metadata(metadata).routingTable(routingTable)
432+
.nodes(DiscoveryNodes.builder().add(newNormalNode("node1")).add(newNormalNode("node2"))).build(), allocation);
433+
assertThat(clusterState.getRoutingTable().shardsWithState(ShardRoutingState.STARTED).size(), equalTo(8));
434+
435+
final ImmutableOpenMap.Builder<ClusterInfo.NodeAndPath, ClusterInfo.ReservedSpace> reservedSpacesBuilder
436+
= ImmutableOpenMap.builder();
437+
final int reservedSpaceNode1 = between(0, 10);
438+
reservedSpacesBuilder.put(new ClusterInfo.NodeAndPath("node1", "/foo/bar"),
439+
new ClusterInfo.ReservedSpace.Builder().add(new ShardId("", "", 0), reservedSpaceNode1).build());
440+
final int reservedSpaceNode2 = between(0, 10);
441+
reservedSpacesBuilder.put(new ClusterInfo.NodeAndPath("node2", "/foo/bar"),
442+
new ClusterInfo.ReservedSpace.Builder().add(new ShardId("", "", 0), reservedSpaceNode2).build());
443+
ImmutableOpenMap<ClusterInfo.NodeAndPath, ClusterInfo.ReservedSpace> reservedSpaces = reservedSpacesBuilder.build();
444+
445+
DiskThresholdMonitor monitor = new DiskThresholdMonitor(Settings.EMPTY, () -> clusterState,
446+
new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), null, () -> 0L,
447+
(reason, priority, listener) -> {
448+
assertNotNull(listener);
449+
assertThat(priority, equalTo(Priority.HIGH));
450+
listener.onResponse(clusterState);
451+
}) {
452+
@Override
453+
protected void updateIndicesReadOnly(Set<String> indicesToUpdate, ActionListener<Void> listener, boolean readOnly) {
454+
if (readOnly) {
455+
assertTrue(indicesToMarkReadOnly.compareAndSet(null, indicesToUpdate));
456+
} else {
457+
assertTrue(indicesToRelease.compareAndSet(null, indicesToUpdate));
458+
}
459+
listener.onResponse(null);
460+
}
461+
};
462+
indicesToMarkReadOnly.set(null);
463+
indicesToRelease.set(null);
464+
ImmutableOpenMap.Builder<String, DiskUsage> builder = ImmutableOpenMap.builder();
465+
builder.put("node1", new DiskUsage("node1", "node1", "/foo/bar", 100, between(0, 4)));
466+
builder.put("node2", new DiskUsage("node2", "node2", "/foo/bar", 100, between(0, 4)));
467+
monitor.onNewInfo(clusterInfo(builder.build(), reservedSpaces));
468+
assertEquals(new HashSet<>(Arrays.asList("test_1", "test_2")), indicesToMarkReadOnly.get());
469+
assertNull(indicesToRelease.get());
470+
471+
// Reserved space is ignored when applying block
472+
indicesToMarkReadOnly.set(null);
473+
indicesToRelease.set(null);
474+
builder = ImmutableOpenMap.builder();
475+
builder.put("node1", new DiskUsage("node1", "node1", "/foo/bar", 100, between(5, 90)));
476+
builder.put("node2", new DiskUsage("node2", "node2", "/foo/bar", 100, between(5, 90)));
477+
monitor.onNewInfo(clusterInfo(builder.build(), reservedSpaces));
478+
assertNull(indicesToMarkReadOnly.get());
479+
assertNull(indicesToRelease.get());
480+
481+
// Change cluster state so that "test_2" index is blocked (read only)
482+
IndexMetadata indexMetadata = IndexMetadata.builder(clusterState.metadata().index("test_2")).settings(Settings.builder()
483+
.put(clusterState.metadata()
484+
.index("test_2").getSettings())
485+
.put(IndexMetadata.INDEX_BLOCKS_READ_ONLY_ALLOW_DELETE_SETTING.getKey(), true)).build();
486+
487+
final String sourceNode;
488+
final String targetNode;
489+
if (randomBoolean()) {
490+
sourceNode = "node1";
491+
targetNode = "node2";
492+
} else {
493+
sourceNode = "node2";
494+
targetNode = "node1";
495+
}
496+
497+
ClusterState clusterStateWithBlocks = ClusterState.builder(clusterState)
498+
.metadata(Metadata.builder(clusterState.metadata())
499+
.put(indexMetadata, true)
500+
.putCustom(NodesShutdownMetadata.TYPE,
501+
new NodesShutdownMetadata(Collections.singletonMap(sourceNode,
502+
SingleNodeShutdownMetadata.builder()
503+
.setNodeId(sourceNode)
504+
.setReason("testing")
505+
.setType(SingleNodeShutdownMetadata.Type.REPLACE)
506+
.setTargetNodeName(targetNode)
507+
.setStartedAtMillis(randomNonNegativeLong())
508+
.build())))
509+
.build())
510+
.blocks(ClusterBlocks.builder().addBlocks(indexMetadata).build())
511+
.build();
512+
513+
assertTrue(clusterStateWithBlocks.blocks().indexBlocked(ClusterBlockLevel.WRITE, "test_2"));
514+
monitor = new DiskThresholdMonitor(Settings.EMPTY, () -> clusterStateWithBlocks,
515+
new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), null, () -> 0L,
516+
(reason, priority, listener) -> {
517+
assertNotNull(listener);
518+
assertThat(priority, equalTo(Priority.HIGH));
519+
listener.onResponse(clusterStateWithBlocks);
520+
}) {
521+
@Override
522+
protected void updateIndicesReadOnly(Set<String> indicesToUpdate, ActionListener<Void> listener, boolean readOnly) {
523+
if (readOnly) {
524+
assertTrue(indicesToMarkReadOnly.compareAndSet(null, indicesToUpdate));
525+
} else {
526+
assertTrue(indicesToRelease.compareAndSet(null, indicesToUpdate));
527+
}
528+
listener.onResponse(null);
529+
}
530+
};
531+
532+
// When free disk on any of node1 or node2 goes below 5% flood watermark, then apply index block on indices not having the block
533+
indicesToMarkReadOnly.set(null);
534+
indicesToRelease.set(null);
535+
builder = ImmutableOpenMap.builder();
536+
builder.put("node1", new DiskUsage("node1", "node1", "/foo/bar", 100, between(0, 100)));
537+
builder.put("node2", new DiskUsage("node2", "node2", "/foo/bar", 100, between(0, 4)));
538+
monitor.onNewInfo(clusterInfo(builder.build(), reservedSpaces));
539+
assertThat(indicesToMarkReadOnly.get(), contains("test_1"));
540+
assertNull(indicesToRelease.get());
541+
542+
// While the REPLACE is ongoing the lock will not be removed from the index
543+
indicesToMarkReadOnly.set(null);
544+
indicesToRelease.set(null);
545+
builder = ImmutableOpenMap.builder();
546+
builder.put("node1", new DiskUsage("node1", "node1", "/foo/bar", 100, between(10, 100)));
547+
builder.put("node2", new DiskUsage("node2", "node2", "/foo/bar", 100, between(10, 100)));
548+
monitor.onNewInfo(clusterInfo(builder.build(), reservedSpaces));
549+
assertNull(indicesToMarkReadOnly.get());
550+
assertNull(indicesToRelease.get());
551+
552+
ClusterState clusterStateNoShutdown = ClusterState.builder(clusterState)
553+
.metadata(Metadata.builder(clusterState.metadata())
554+
.put(indexMetadata, true)
555+
.removeCustom(NodesShutdownMetadata.TYPE)
556+
.build())
557+
.blocks(ClusterBlocks.builder().addBlocks(indexMetadata).build())
558+
.build();
559+
560+
assertTrue(clusterStateNoShutdown.blocks().indexBlocked(ClusterBlockLevel.WRITE, "test_2"));
561+
monitor = new DiskThresholdMonitor(Settings.EMPTY, () -> clusterStateNoShutdown,
562+
new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), null, () -> 0L,
563+
(reason, priority, listener) -> {
564+
assertNotNull(listener);
565+
assertThat(priority, equalTo(Priority.HIGH));
566+
listener.onResponse(clusterStateNoShutdown);
567+
}) {
568+
@Override
569+
protected void updateIndicesReadOnly(Set<String> indicesToUpdate, ActionListener<Void> listener, boolean readOnly) {
570+
if (readOnly) {
571+
assertTrue(indicesToMarkReadOnly.compareAndSet(null, indicesToUpdate));
572+
} else {
573+
assertTrue(indicesToRelease.compareAndSet(null, indicesToUpdate));
574+
}
575+
listener.onResponse(null);
576+
}
577+
};
578+
579+
// Now that the REPLACE is gone, auto-releasing can occur for the index
580+
indicesToMarkReadOnly.set(null);
581+
indicesToRelease.set(null);
582+
builder = ImmutableOpenMap.builder();
583+
builder.put("node1", new DiskUsage("node1", "node1", "/foo/bar", 100, between(10, 100)));
584+
builder.put("node2", new DiskUsage("node2", "node2", "/foo/bar", 100, between(10, 100)));
585+
monitor.onNewInfo(clusterInfo(builder.build(), reservedSpaces));
586+
assertNull(indicesToMarkReadOnly.get());
587+
assertThat(indicesToRelease.get(), contains("test_2"));
588+
}
589+
414590
@TestLogging(value="org.elasticsearch.cluster.routing.allocation.DiskThresholdMonitor:INFO", reason="testing INFO/WARN logging")
415591
public void testDiskMonitorLogging() throws IllegalAccessException {
416592
final ClusterState clusterState = ClusterState.builder(ClusterName.CLUSTER_NAME_SETTING.getDefault(Settings.EMPTY))
@@ -662,6 +838,6 @@ private static DiscoveryNode newNormalNode(String nodeId) {
662838
Set<DiscoveryNodeRole> roles = Sets.union(randomRoles, Set.of(randomFrom(DiscoveryNodeRole.DATA_ROLE,
663839
DiscoveryNodeRole.DATA_CONTENT_NODE_ROLE, DiscoveryNodeRole.DATA_HOT_NODE_ROLE, DiscoveryNodeRole.DATA_WARM_NODE_ROLE,
664840
DiscoveryNodeRole.DATA_COLD_NODE_ROLE)));
665-
return newNode(nodeId, roles);
841+
return newNode(nodeId, nodeId, roles);
666842
}
667843
}

0 commit comments

Comments
 (0)