|
21 | 21 | import org.elasticsearch.cluster.block.ClusterBlocks;
|
22 | 22 | import org.elasticsearch.cluster.metadata.IndexMetadata;
|
23 | 23 | import org.elasticsearch.cluster.metadata.Metadata;
|
| 24 | +import org.elasticsearch.cluster.metadata.NodesShutdownMetadata; |
| 25 | +import org.elasticsearch.cluster.metadata.SingleNodeShutdownMetadata; |
24 | 26 | import org.elasticsearch.cluster.node.DiscoveryNode;
|
25 | 27 | import org.elasticsearch.cluster.node.DiscoveryNodeRole;
|
26 | 28 | import org.elasticsearch.cluster.node.DiscoveryNodes;
|
@@ -411,6 +413,154 @@ protected void updateIndicesReadOnly(Set<String> indicesToUpdate, ActionListener
|
411 | 413 | assertNull(indicesToRelease.get());
|
412 | 414 | }
|
413 | 415 |
|
| 416 | + public void testNoAutoReleaseOfIndicesOnReplacementNodes() { |
| 417 | + AtomicReference<Set<String>> indicesToMarkReadOnly = new AtomicReference<>(); |
| 418 | + AtomicReference<Set<String>> indicesToRelease = new AtomicReference<>(); |
| 419 | + AtomicReference<ClusterState> currentClusterState = new AtomicReference<>(); |
| 420 | + AllocationService allocation = createAllocationService(Settings.builder() |
| 421 | + .put("cluster.routing.allocation.node_concurrent_recoveries", 10).build()); |
| 422 | + Metadata metadata = Metadata.builder() |
| 423 | + .put(IndexMetadata.builder("test_1").settings(settings(Version.CURRENT)).numberOfShards(2).numberOfReplicas(1)) |
| 424 | + .put(IndexMetadata.builder("test_2").settings(settings(Version.CURRENT)).numberOfShards(2).numberOfReplicas(1)) |
| 425 | + .build(); |
| 426 | + RoutingTable routingTable = RoutingTable.builder() |
| 427 | + .addAsNew(metadata.index("test_1")) |
| 428 | + .addAsNew(metadata.index("test_2")) |
| 429 | + .build(); |
| 430 | + final ClusterState clusterState = applyStartedShardsUntilNoChange( |
| 431 | + ClusterState.builder(ClusterName.CLUSTER_NAME_SETTING.getDefault(Settings.EMPTY)) |
| 432 | + .metadata(metadata).routingTable(routingTable) |
| 433 | + .nodes(DiscoveryNodes.builder().add(newNormalNode("node1", "my-node1")) |
| 434 | + .add(newNormalNode("node2", "my-node2"))).build(), allocation); |
| 435 | + assertThat(clusterState.getRoutingNodes().shardsWithState(ShardRoutingState.STARTED).size(), equalTo(8)); |
| 436 | + |
| 437 | + final ImmutableOpenMap.Builder<ClusterInfo.NodeAndPath, ClusterInfo.ReservedSpace> reservedSpacesBuilder |
| 438 | + = ImmutableOpenMap.builder(); |
| 439 | + final int reservedSpaceNode1 = between(0, 10); |
| 440 | + reservedSpacesBuilder.put(new ClusterInfo.NodeAndPath("node1", "/foo/bar"), |
| 441 | + new ClusterInfo.ReservedSpace.Builder().add(new ShardId("", "", 0), reservedSpaceNode1).build()); |
| 442 | + final int reservedSpaceNode2 = between(0, 10); |
| 443 | + reservedSpacesBuilder.put(new ClusterInfo.NodeAndPath("node2", "/foo/bar"), |
| 444 | + new ClusterInfo.ReservedSpace.Builder().add(new ShardId("", "", 0), reservedSpaceNode2).build()); |
| 445 | + ImmutableOpenMap<ClusterInfo.NodeAndPath, ClusterInfo.ReservedSpace> reservedSpaces = reservedSpacesBuilder.build(); |
| 446 | + |
| 447 | + currentClusterState.set(clusterState); |
| 448 | + |
| 449 | + final DiskThresholdMonitor monitor = new DiskThresholdMonitor(Settings.EMPTY, currentClusterState::get, |
| 450 | + new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), null, () -> 0L, |
| 451 | + (reason, priority, listener) -> { |
| 452 | + assertNotNull(listener); |
| 453 | + assertThat(priority, equalTo(Priority.HIGH)); |
| 454 | + listener.onResponse(currentClusterState.get()); |
| 455 | + }) { |
| 456 | + @Override |
| 457 | + protected void updateIndicesReadOnly(Set<String> indicesToUpdate, ActionListener<Void> listener, boolean readOnly) { |
| 458 | + if (readOnly) { |
| 459 | + assertTrue(indicesToMarkReadOnly.compareAndSet(null, indicesToUpdate)); |
| 460 | + } else { |
| 461 | + assertTrue(indicesToRelease.compareAndSet(null, indicesToUpdate)); |
| 462 | + } |
| 463 | + listener.onResponse(null); |
| 464 | + } |
| 465 | + }; |
| 466 | + indicesToMarkReadOnly.set(null); |
| 467 | + indicesToRelease.set(null); |
| 468 | + ImmutableOpenMap.Builder<String, DiskUsage> builder = ImmutableOpenMap.builder(); |
| 469 | + builder.put("node1", new DiskUsage("node1", "node1", "/foo/bar", 100, between(0, 4))); |
| 470 | + builder.put("node2", new DiskUsage("node2", "node2", "/foo/bar", 100, between(0, 4))); |
| 471 | + monitor.onNewInfo(clusterInfo(builder.build(), reservedSpaces)); |
| 472 | + assertEquals(new HashSet<>(Arrays.asList("test_1", "test_2")), indicesToMarkReadOnly.get()); |
| 473 | + assertNull(indicesToRelease.get()); |
| 474 | + |
| 475 | + // Reserved space is ignored when applying block |
| 476 | + indicesToMarkReadOnly.set(null); |
| 477 | + indicesToRelease.set(null); |
| 478 | + builder = ImmutableOpenMap.builder(); |
| 479 | + builder.put("node1", new DiskUsage("node1", "node1", "/foo/bar", 100, between(5, 90))); |
| 480 | + builder.put("node2", new DiskUsage("node2", "node2", "/foo/bar", 100, between(5, 90))); |
| 481 | + monitor.onNewInfo(clusterInfo(builder.build(), reservedSpaces)); |
| 482 | + assertNull(indicesToMarkReadOnly.get()); |
| 483 | + assertNull(indicesToRelease.get()); |
| 484 | + |
| 485 | + // Change cluster state so that "test_2" index is blocked (read only) |
| 486 | + IndexMetadata indexMetadata = IndexMetadata.builder(clusterState.metadata().index("test_2")).settings(Settings.builder() |
| 487 | + .put(clusterState.metadata() |
| 488 | + .index("test_2").getSettings()) |
| 489 | + .put(IndexMetadata.INDEX_BLOCKS_READ_ONLY_ALLOW_DELETE_SETTING.getKey(), true)).build(); |
| 490 | + |
| 491 | + final String sourceNode; |
| 492 | + final String targetNode; |
| 493 | + if (randomBoolean()) { |
| 494 | + sourceNode = "node1"; |
| 495 | + targetNode = "my-node2"; |
| 496 | + } else { |
| 497 | + sourceNode = "node2"; |
| 498 | + targetNode = "my-node1"; |
| 499 | + } |
| 500 | + |
| 501 | + final ClusterState clusterStateWithBlocks = ClusterState.builder(clusterState) |
| 502 | + .metadata(Metadata.builder(clusterState.metadata()) |
| 503 | + .put(indexMetadata, true) |
| 504 | + .putCustom(NodesShutdownMetadata.TYPE, |
| 505 | + new NodesShutdownMetadata(Collections.singletonMap(sourceNode, |
| 506 | + SingleNodeShutdownMetadata.builder() |
| 507 | + .setNodeId(sourceNode) |
| 508 | + .setReason("testing") |
| 509 | + .setType(SingleNodeShutdownMetadata.Type.REPLACE) |
| 510 | + .setTargetNodeName(targetNode) |
| 511 | + .setStartedAtMillis(randomNonNegativeLong()) |
| 512 | + .build()))) |
| 513 | + .build()) |
| 514 | + .blocks(ClusterBlocks.builder().addBlocks(indexMetadata).build()) |
| 515 | + .build(); |
| 516 | + |
| 517 | + assertTrue(clusterStateWithBlocks.blocks().indexBlocked(ClusterBlockLevel.WRITE, "test_2")); |
| 518 | + |
| 519 | + currentClusterState.set(clusterStateWithBlocks); |
| 520 | + |
| 521 | + // When free disk on any of node1 or node2 goes below 5% flood watermark, then apply index block on indices not having the block |
| 522 | + indicesToMarkReadOnly.set(null); |
| 523 | + indicesToRelease.set(null); |
| 524 | + builder = ImmutableOpenMap.builder(); |
| 525 | + builder.put("node1", new DiskUsage("node1", "node1", "/foo/bar", 100, between(0, 100))); |
| 526 | + builder.put("node2", new DiskUsage("node2", "node2", "/foo/bar", 100, between(0, 4))); |
| 527 | + monitor.onNewInfo(clusterInfo(builder.build(), reservedSpaces)); |
| 528 | + assertThat(indicesToMarkReadOnly.get(), contains("test_1")); |
| 529 | + assertNull(indicesToRelease.get()); |
| 530 | + |
| 531 | + // While the REPLACE is ongoing the lock will not be removed from the index |
| 532 | + indicesToMarkReadOnly.set(null); |
| 533 | + indicesToRelease.set(null); |
| 534 | + builder = ImmutableOpenMap.builder(); |
| 535 | + builder.put("node1", new DiskUsage("node1", "node1", "/foo/bar", 100, between(10, 100))); |
| 536 | + builder.put("node2", new DiskUsage("node2", "node2", "/foo/bar", 100, between(10, 100))); |
| 537 | + monitor.onNewInfo(clusterInfo(builder.build(), reservedSpaces)); |
| 538 | + assertNull(indicesToMarkReadOnly.get()); |
| 539 | + assertNull(indicesToRelease.get()); |
| 540 | + |
| 541 | + final ClusterState clusterStateNoShutdown = ClusterState.builder(clusterState) |
| 542 | + .metadata(Metadata.builder(clusterState.metadata()) |
| 543 | + .put(indexMetadata, true) |
| 544 | + .removeCustom(NodesShutdownMetadata.TYPE) |
| 545 | + .build()) |
| 546 | + .blocks(ClusterBlocks.builder().addBlocks(indexMetadata).build()) |
| 547 | + .build(); |
| 548 | + |
| 549 | + assertTrue(clusterStateNoShutdown.blocks().indexBlocked(ClusterBlockLevel.WRITE, "test_2")); |
| 550 | + |
| 551 | + currentClusterState.set(clusterStateNoShutdown); |
| 552 | + |
| 553 | + // Now that the REPLACE is gone, auto-releasing can occur for the index |
| 554 | + indicesToMarkReadOnly.set(null); |
| 555 | + indicesToRelease.set(null); |
| 556 | + builder = ImmutableOpenMap.builder(); |
| 557 | + builder.put("node1", new DiskUsage("node1", "node1", "/foo/bar", 100, between(10, 100))); |
| 558 | + builder.put("node2", new DiskUsage("node2", "node2", "/foo/bar", 100, between(10, 100))); |
| 559 | + monitor.onNewInfo(clusterInfo(builder.build(), reservedSpaces)); |
| 560 | + assertNull(indicesToMarkReadOnly.get()); |
| 561 | + assertThat(indicesToRelease.get(), contains("test_2")); |
| 562 | + } |
| 563 | + |
414 | 564 | @TestLogging(value="org.elasticsearch.cluster.routing.allocation.DiskThresholdMonitor:INFO", reason="testing INFO/WARN logging")
|
415 | 565 | public void testDiskMonitorLogging() throws IllegalAccessException {
|
416 | 566 | final ClusterState clusterState = ClusterState.builder(ClusterName.CLUSTER_NAME_SETTING.getDefault(Settings.EMPTY))
|
@@ -656,12 +806,16 @@ private static DiscoveryNode newFrozenOnlyNode(String nodeId) {
|
656 | 806 | return newNode(nodeId, Sets.union(Set.of(DiscoveryNodeRole.DATA_FROZEN_NODE_ROLE), irrelevantRoles));
|
657 | 807 | }
|
658 | 808 |
|
659 |
| - private static DiscoveryNode newNormalNode(String nodeId) { |
| 809 | + private static DiscoveryNode newNormalNode(String nodeId, String nodeName) { |
660 | 810 | Set<DiscoveryNodeRole> randomRoles =
|
661 | 811 | new HashSet<>(randomSubsetOf(DiscoveryNodeRole.roles()));
|
662 | 812 | Set<DiscoveryNodeRole> roles = Sets.union(randomRoles, Set.of(randomFrom(DiscoveryNodeRole.DATA_ROLE,
|
663 | 813 | DiscoveryNodeRole.DATA_CONTENT_NODE_ROLE, DiscoveryNodeRole.DATA_HOT_NODE_ROLE, DiscoveryNodeRole.DATA_WARM_NODE_ROLE,
|
664 | 814 | DiscoveryNodeRole.DATA_COLD_NODE_ROLE)));
|
665 |
| - return newNode(nodeId, roles); |
| 815 | + return newNode(nodeName, nodeId, roles); |
| 816 | + } |
| 817 | + |
| 818 | + private static DiscoveryNode newNormalNode(String nodeId) { |
| 819 | + return newNormalNode(nodeId, ""); |
666 | 820 | }
|
667 | 821 | }
|
0 commit comments