From 5e4a9ec49081d9bfad2aa430d72b0382e341baed Mon Sep 17 00:00:00 2001 From: David Turner Date: Wed, 28 Jun 2023 09:00:03 +0100 Subject: [PATCH 1/3] Stateless primary relocations Adds support for proper online relocations of stateless primaries. --- .../routing/IndexShardRoutingTable.java | 9 ----- .../cluster/routing/RoutingNodes.java | 26 +------------- .../allocator/DesiredBalanceReconciler.java | 4 +-- .../command/MoveAllocationCommand.java | 2 +- .../recovery/PeerRecoveryTargetService.java | 36 ++++++++++++++++++- .../java/org/elasticsearch/node/Node.java | 1 + .../routing/allocation/RoutingNodesTests.java | 18 +++------- ...ClusterStateServiceRandomUpdatesTests.java | 3 +- .../snapshots/SnapshotResiliencyTests.java | 9 ++++- 9 files changed, 54 insertions(+), 54 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/cluster/routing/IndexShardRoutingTable.java b/server/src/main/java/org/elasticsearch/cluster/routing/IndexShardRoutingTable.java index cc0bb6fd4323b..bd15d924c9c19 100644 --- a/server/src/main/java/org/elasticsearch/cluster/routing/IndexShardRoutingTable.java +++ b/server/src/main/java/org/elasticsearch/cluster/routing/IndexShardRoutingTable.java @@ -613,7 +613,6 @@ public IndexShardRoutingTable build() { assert distinctNodes(shards) : "more than one shard with same id assigned to same node (shards: " + shards + ")"; assert noDuplicatePrimary(shards) : "expected but did not find unique primary in shard routing table: " + shards; assert noAssignedReplicaWithoutActivePrimary(shards) : "unexpected assigned replica with no active primary: " + shards; - assert noRelocatingUnsearchableShards(shards) : "unexpected RELOCATING unsearchable shard: " + shards; return new IndexShardRoutingTable(shardId, shards); } @@ -664,14 +663,6 @@ static boolean noAssignedReplicaWithoutActivePrimary(List shards) return seenAssignedReplica == false; } - static boolean noRelocatingUnsearchableShards(List shards) { - // this is unsupported until ES-4677 is implemented - for (var shard : shards) { - assert shard.role().isSearchable() || shard.relocating() == false : "unexpected RELOCATING unsearchable shard: " + shard; - } - return true; - } - public static IndexShardRoutingTable.Builder readFrom(StreamInput in) throws IOException { Index index = new Index(in); return readFromThin(in, index); diff --git a/server/src/main/java/org/elasticsearch/cluster/routing/RoutingNodes.java b/server/src/main/java/org/elasticsearch/cluster/routing/RoutingNodes.java index 0856472bccf7f..c4f827f807502 100644 --- a/server/src/main/java/org/elasticsearch/cluster/routing/RoutingNodes.java +++ b/server/src/main/java/org/elasticsearch/cluster/routing/RoutingNodes.java @@ -459,30 +459,6 @@ public Tuple relocateShard( return Tuple.tuple(source, target); } - public void relocateOrReinitializeShard( - ShardRouting startedShard, - String nodeId, - long expectedShardSize, - RoutingChangesObserver changes - ) { - if (startedShard.isSearchable() == false) { - remove(startedShard); - var unassignedInfo = new UnassignedInfo(UnassignedInfo.Reason.REINITIALIZED, "relocating unsearchable shard"); - var assignedShards = assignedShards(startedShard.shardId()); - var promotableShard = assignedShards.stream().filter(ShardRouting::isPromotableToPrimary).findAny(); - assert promotableShard.isEmpty() : "multiple promotable shards are not supported yet"; - // replicas needs to be removed as well as they could not be active when primary is unassigned - // see org.elasticsearch.cluster.routing.IndexShardRoutingTable.Builder.noAssignedReplicaWithoutActivePrimary - for (ShardRouting replica : List.copyOf(assignedShards)) { - remove(replica); - unassignedShards.ignoreShard(replica.moveToUnassigned(unassignedInfo), AllocationStatus.NO_ATTEMPT, changes); - } - initializeShard(startedShard.moveToUnassigned(unassignedInfo), nodeId, null, expectedShardSize, changes); - } else { - relocateShard(startedShard, nodeId, expectedShardSize, changes); - } - } - /** * Applies the relevant logic to start an initializing shard. * @@ -533,7 +509,7 @@ public ShardRouting startShard( routing, new UnassignedInfo(UnassignedInfo.Reason.REINITIALIZED, "primary changed") ); - relocateOrReinitializeShard( + relocateShard( startedReplica, sourceShard.relocatingNodeId(), sourceShard.getExpectedShardSize(), diff --git a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/DesiredBalanceReconciler.java b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/DesiredBalanceReconciler.java index b286f74bde308..f99fb71525e55 100644 --- a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/DesiredBalanceReconciler.java +++ b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/DesiredBalanceReconciler.java @@ -378,7 +378,7 @@ private void moveShards() { final var moveTarget = findRelocationTarget(shardRouting, assignment.nodeIds()); if (moveTarget != null) { logger.debug("Moving shard {} from {} to {}", shardRouting.shardId(), shardRouting.currentNodeId(), moveTarget.getId()); - routingNodes.relocateOrReinitializeShard( + routingNodes.relocateShard( shardRouting, moveTarget.getId(), allocation.clusterInfo().getShardSize(shardRouting, ShardRouting.UNAVAILABLE_EXPECTED_SHARD_SIZE), @@ -443,7 +443,7 @@ private void balance() { rebalanceTarget.getId() ); - routingNodes.relocateOrReinitializeShard( + routingNodes.relocateShard( shardRouting, rebalanceTarget.getId(), allocation.clusterInfo().getShardSize(shardRouting, ShardRouting.UNAVAILABLE_EXPECTED_SHARD_SIZE), diff --git a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/command/MoveAllocationCommand.java b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/command/MoveAllocationCommand.java index 23a579988d583..8b9d5a402634f 100644 --- a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/command/MoveAllocationCommand.java +++ b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/command/MoveAllocationCommand.java @@ -168,7 +168,7 @@ public RerouteExplanation execute(RoutingAllocation allocation, boolean explain) // its being throttled, maybe have a flag to take it into account and fail? for now, just do it since the "user" wants it... } allocation.routingNodes() - .relocateOrReinitializeShard( + .relocateShard( shardRouting, toRoutingNode.nodeId(), allocation.clusterInfo().getShardSize(shardRouting, ShardRouting.UNAVAILABLE_EXPECTED_SHARD_SIZE), diff --git a/server/src/main/java/org/elasticsearch/indices/recovery/PeerRecoveryTargetService.java b/server/src/main/java/org/elasticsearch/indices/recovery/PeerRecoveryTargetService.java index b3f2c60f90740..8eb3894c7c87b 100644 --- a/server/src/main/java/org/elasticsearch/indices/recovery/PeerRecoveryTargetService.java +++ b/server/src/main/java/org/elasticsearch/indices/recovery/PeerRecoveryTargetService.java @@ -16,8 +16,10 @@ import org.elasticsearch.ElasticsearchTimeoutException; import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.ActionResponse; import org.elasticsearch.action.ActionRunnable; import org.elasticsearch.action.support.ChannelActionListener; +import org.elasticsearch.client.internal.Client; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.ClusterStateObserver; import org.elasticsearch.cluster.metadata.IndexMetadata; @@ -90,6 +92,7 @@ public static class Actions { public static final String HANDOFF_PRIMARY_CONTEXT = "internal:index/shard/recovery/handoff_primary_context"; } + private final Client client; private final ThreadPool threadPool; private final TransportService transportService; @@ -101,12 +104,14 @@ public static class Actions { private final RecoveriesCollection onGoingRecoveries; public PeerRecoveryTargetService( + Client client, ThreadPool threadPool, TransportService transportService, RecoverySettings recoverySettings, ClusterService clusterService, SnapshotFilesProvider snapshotFilesProvider ) { + this.client = client; this.threadPool = threadPool; this.transportService = transportService; this.recoverySettings = recoverySettings; @@ -289,7 +294,7 @@ private void doRecovery(final long recoveryId, final StartRecoveryRequest preExi assert preExistingRequest == null; assert indexShard.indexSettings().getIndexMetadata().isSearchableSnapshot() == false; ActionListener.run(cleanupOnly.map(v -> { - logger.trace("{} preparing shard for peer recovery", recoveryTarget.shardId()); + logger.trace("{} preparing unpromotable shard for recovery", recoveryTarget.shardId()); indexShard.prepareForIndexRecovery(); // Skip unnecessary intermediate stages recoveryState.setStage(RecoveryState.Stage.VERIFY_INDEX); @@ -303,6 +308,35 @@ private void doRecovery(final long recoveryId, final StartRecoveryRequest preExi return; } + if (indexShard.routingEntry().isSearchable() == false && recoveryState.getPrimary()) { + assert preExistingRequest == null; + assert indexShard.indexSettings().getIndexMetadata().isSearchableSnapshot() == false; + try (onCompletion) { + client.execute( + StatelessPrimaryRelocationAction.INSTANCE, + new StatelessPrimaryRelocationAction.Request( + recoveryId, + indexShard.shardId(), + transportService.getLocalNode(), + indexShard.routingEntry().allocationId().getId() + ), + new ActionListener<>() { + @Override + public void onResponse(ActionResponse.Empty ignored) { + onGoingRecoveries.markRecoveryAsDone(recoveryId); + } + + @Override + public void onFailure(Exception e) { + // TODO retries? See RecoveryResponseHandler#handleException + onGoingRecoveries.failRecovery(recoveryId, new RecoveryFailedException(recoveryState, null, e), true); + } + } + ); + return; + } + } + record StartRecoveryRequestToSend(StartRecoveryRequest startRecoveryRequest, String actionName, TransportRequest requestToSend) {} final ActionListener toSendListener = cleanupOnly.map(r -> { logger.trace( diff --git a/server/src/main/java/org/elasticsearch/node/Node.java b/server/src/main/java/org/elasticsearch/node/Node.java index b67930db111e8..c3db9cc118356 100644 --- a/server/src/main/java/org/elasticsearch/node/Node.java +++ b/server/src/main/java/org/elasticsearch/node/Node.java @@ -1088,6 +1088,7 @@ protected Node( b.bind(PeerRecoveryTargetService.class) .toInstance( new PeerRecoveryTargetService( + client, threadPool, transportService, recoverySettings, diff --git a/server/src/test/java/org/elasticsearch/cluster/routing/allocation/RoutingNodesTests.java b/server/src/test/java/org/elasticsearch/cluster/routing/allocation/RoutingNodesTests.java index 56a276b907ae4..5ec1a5dca92d6 100644 --- a/server/src/test/java/org/elasticsearch/cluster/routing/allocation/RoutingNodesTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/routing/allocation/RoutingNodesTests.java @@ -444,13 +444,8 @@ public void testMoveShardWithDefaultRole() { var routingNodes = clusterState.getRoutingNodes().mutableCopy(); - routingNodes.relocateOrReinitializeShard( - routingNodes.node("node-1").getByShardId(shardId), - "node-3", - 0L, - new RoutingChangesObserver() { - } - ); + routingNodes.relocateShard(routingNodes.node("node-1").getByShardId(shardId), "node-3", 0L, new RoutingChangesObserver() { + }); assertThat(routingNodes.node("node-1").getByShardId(shardId).state(), equalTo(RELOCATING)); assertThat(routingNodes.node("node-2").getByShardId(shardId).state(), equalTo(STARTED)); @@ -484,13 +479,8 @@ public void testMoveShardWithPromotableOnlyRole() { var routingNodes = clusterState.getRoutingNodes().mutableCopy(); - routingNodes.relocateOrReinitializeShard( - routingNodes.node("node-1").getByShardId(shardId), - "node-3", - 0L, - new RoutingChangesObserver() { - } - ); + routingNodes.relocateShard(routingNodes.node("node-1").getByShardId(shardId), "node-3", 0L, new RoutingChangesObserver() { + }); assertThat(routingNodes.node("node-1").getByShardId(shardId), nullValue()); assertThat(routingNodes.node("node-2").getByShardId(shardId), nullValue()); diff --git a/server/src/test/java/org/elasticsearch/indices/cluster/IndicesClusterStateServiceRandomUpdatesTests.java b/server/src/test/java/org/elasticsearch/indices/cluster/IndicesClusterStateServiceRandomUpdatesTests.java index 095cd38288a12..277d0472d738f 100644 --- a/server/src/test/java/org/elasticsearch/indices/cluster/IndicesClusterStateServiceRandomUpdatesTests.java +++ b/server/src/test/java/org/elasticsearch/indices/cluster/IndicesClusterStateServiceRandomUpdatesTests.java @@ -532,7 +532,9 @@ private IndicesClusterStateService createIndicesClusterStateService( threadPool, List.of() ); + final NodeClient client = mock(NodeClient.class); final PeerRecoveryTargetService recoveryTargetService = new PeerRecoveryTargetService( + client, threadPool, transportService, null, @@ -541,7 +543,6 @@ private IndicesClusterStateService createIndicesClusterStateService( ); final ShardStateAction shardStateAction = mock(ShardStateAction.class); final PrimaryReplicaSyncer primaryReplicaSyncer = mock(PrimaryReplicaSyncer.class); - final NodeClient client = mock(NodeClient.class); return new IndicesClusterStateService( settings, indicesService, diff --git a/server/src/test/java/org/elasticsearch/snapshots/SnapshotResiliencyTests.java b/server/src/test/java/org/elasticsearch/snapshots/SnapshotResiliencyTests.java index 6b23545dc5685..366bb13e609f5 100644 --- a/server/src/test/java/org/elasticsearch/snapshots/SnapshotResiliencyTests.java +++ b/server/src/test/java/org/elasticsearch/snapshots/SnapshotResiliencyTests.java @@ -1863,7 +1863,14 @@ protected void assertSnapshotOrGenericThread() { indicesService, clusterService, threadPool, - new PeerRecoveryTargetService(threadPool, transportService, recoverySettings, clusterService, snapshotFilesProvider), + new PeerRecoveryTargetService( + client, + threadPool, + transportService, + recoverySettings, + clusterService, + snapshotFilesProvider + ), shardStateAction, repositoriesService, searchService, From 14701fae67c41be66ffe8543f57a870e7d257b08 Mon Sep 17 00:00:00 2001 From: David Turner Date: Wed, 28 Jun 2023 09:44:39 +0100 Subject: [PATCH 2/3] Fix RoutingNodesTests --- .../cluster/routing/allocation/RoutingNodesTests.java | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/server/src/test/java/org/elasticsearch/cluster/routing/allocation/RoutingNodesTests.java b/server/src/test/java/org/elasticsearch/cluster/routing/allocation/RoutingNodesTests.java index 5ec1a5dca92d6..05f9ad6f45a6f 100644 --- a/server/src/test/java/org/elasticsearch/cluster/routing/allocation/RoutingNodesTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/routing/allocation/RoutingNodesTests.java @@ -44,9 +44,8 @@ import static org.elasticsearch.cluster.routing.ShardRoutingState.INITIALIZING; import static org.elasticsearch.cluster.routing.ShardRoutingState.RELOCATING; import static org.elasticsearch.cluster.routing.ShardRoutingState.STARTED; +import static org.hamcrest.Matchers.empty; import static org.hamcrest.Matchers.equalTo; -import static org.hamcrest.Matchers.hasSize; -import static org.hamcrest.Matchers.nullValue; import static org.hamcrest.Matchers.oneOf; public class RoutingNodesTests extends ESAllocationTestCase { @@ -482,10 +481,10 @@ public void testMoveShardWithPromotableOnlyRole() { routingNodes.relocateShard(routingNodes.node("node-1").getByShardId(shardId), "node-3", 0L, new RoutingChangesObserver() { }); - assertThat(routingNodes.node("node-1").getByShardId(shardId), nullValue()); - assertThat(routingNodes.node("node-2").getByShardId(shardId), nullValue()); + assertThat(routingNodes.node("node-1").getByShardId(shardId).state(), equalTo(RELOCATING)); + assertThat(routingNodes.node("node-2").getByShardId(shardId).state(), equalTo(STARTED)); assertThat(routingNodes.node("node-3").getByShardId(shardId).state(), equalTo(INITIALIZING)); - assertThat(routingNodes.unassigned().ignored(), hasSize(1)); + assertThat(routingNodes.unassigned().ignored(), empty()); } private boolean assertShardStats(RoutingNodes routingNodes) { From d4ca54f1db313eca6a7b93b7d9e8b39be09d0129 Mon Sep 17 00:00:00 2001 From: David Turner Date: Wed, 28 Jun 2023 10:32:39 +0100 Subject: [PATCH 3/3] Refactor RoutingNodesTests to reduce duplication --- .../routing/allocation/RoutingNodesTests.java | 40 +++---------------- 1 file changed, 6 insertions(+), 34 deletions(-) diff --git a/server/src/test/java/org/elasticsearch/cluster/routing/allocation/RoutingNodesTests.java b/server/src/test/java/org/elasticsearch/cluster/routing/allocation/RoutingNodesTests.java index 05f9ad6f45a6f..18adf3ca32c74 100644 --- a/server/src/test/java/org/elasticsearch/cluster/routing/allocation/RoutingNodesTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/routing/allocation/RoutingNodesTests.java @@ -417,42 +417,14 @@ public void testNodeInterleavedShardIterator() { } public void testMoveShardWithDefaultRole() { - - var inSync = randomList(2, 2, UUIDs::randomBase64UUID); - var indexMetadata = IndexMetadata.builder("index") - .settings(indexSettings(Version.CURRENT, 1, 1)) - .putInSyncAllocationIds(0, Set.copyOf(inSync)) - .build(); - - var shardId = new ShardId(indexMetadata.getIndex(), 0); - - var indexRoutingTable = IndexRoutingTable.builder(indexMetadata.getIndex()) - .addShard(TestShardRouting.newShardRouting(shardId, "node-1", null, true, STARTED, ShardRouting.Role.DEFAULT)) - .addShard(TestShardRouting.newShardRouting(shardId, "node-2", null, false, STARTED, ShardRouting.Role.DEFAULT)) - .build(); - - var node1 = newNode("node-1"); - var node2 = newNode("node-2"); - var node3 = newNode("node-3"); - - var clusterState = ClusterState.builder(ClusterName.DEFAULT) - .metadata(Metadata.builder().put(indexMetadata, false).build()) - .nodes(DiscoveryNodes.builder().add(node1).add(node2).add(node3).build()) - .routingTable(RoutingTable.builder().add(indexRoutingTable).build()) - .build(); - - var routingNodes = clusterState.getRoutingNodes().mutableCopy(); - - routingNodes.relocateShard(routingNodes.node("node-1").getByShardId(shardId), "node-3", 0L, new RoutingChangesObserver() { - }); - - assertThat(routingNodes.node("node-1").getByShardId(shardId).state(), equalTo(RELOCATING)); - assertThat(routingNodes.node("node-2").getByShardId(shardId).state(), equalTo(STARTED)); - assertThat(routingNodes.node("node-3").getByShardId(shardId).state(), equalTo(INITIALIZING)); + runMoveShardRolesTest(ShardRouting.Role.DEFAULT, ShardRouting.Role.DEFAULT); } public void testMoveShardWithPromotableOnlyRole() { + runMoveShardRolesTest(ShardRouting.Role.INDEX_ONLY, ShardRouting.Role.SEARCH_ONLY); + } + private void runMoveShardRolesTest(ShardRouting.Role primaryRole, ShardRouting.Role replicaRole) { var inSync = randomList(2, 2, UUIDs::randomBase64UUID); var indexMetadata = IndexMetadata.builder("index") .settings(indexSettings(Version.CURRENT, 1, 1)) @@ -462,8 +434,8 @@ public void testMoveShardWithPromotableOnlyRole() { var shardId = new ShardId(indexMetadata.getIndex(), 0); var indexRoutingTable = IndexRoutingTable.builder(indexMetadata.getIndex()) - .addShard(TestShardRouting.newShardRouting(shardId, "node-1", null, true, STARTED, ShardRouting.Role.INDEX_ONLY)) - .addShard(TestShardRouting.newShardRouting(shardId, "node-2", null, false, STARTED, ShardRouting.Role.SEARCH_ONLY)) + .addShard(TestShardRouting.newShardRouting(shardId, "node-1", null, true, STARTED, primaryRole)) + .addShard(TestShardRouting.newShardRouting(shardId, "node-2", null, false, STARTED, replicaRole)) .build(); var node1 = newNode("node-1");