Skip to content

Commit 9453d69

Browse files
authored
Stateless primary relocations (#97162)
Adds support for proper online relocations of stateless primaries.
1 parent 4eb5dff commit 9453d69

File tree

9 files changed

+62
-91
lines changed

9 files changed

+62
-91
lines changed

server/src/main/java/org/elasticsearch/cluster/routing/IndexShardRoutingTable.java

Lines changed: 0 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -613,7 +613,6 @@ public IndexShardRoutingTable build() {
613613
assert distinctNodes(shards) : "more than one shard with same id assigned to same node (shards: " + shards + ")";
614614
assert noDuplicatePrimary(shards) : "expected but did not find unique primary in shard routing table: " + shards;
615615
assert noAssignedReplicaWithoutActivePrimary(shards) : "unexpected assigned replica with no active primary: " + shards;
616-
assert noRelocatingUnsearchableShards(shards) : "unexpected RELOCATING unsearchable shard: " + shards;
617616
return new IndexShardRoutingTable(shardId, shards);
618617
}
619618

@@ -664,14 +663,6 @@ static boolean noAssignedReplicaWithoutActivePrimary(List<ShardRouting> shards)
664663
return seenAssignedReplica == false;
665664
}
666665

667-
static boolean noRelocatingUnsearchableShards(List<ShardRouting> shards) {
668-
// this is unsupported until ES-4677 is implemented
669-
for (var shard : shards) {
670-
assert shard.role().isSearchable() || shard.relocating() == false : "unexpected RELOCATING unsearchable shard: " + shard;
671-
}
672-
return true;
673-
}
674-
675666
public static IndexShardRoutingTable.Builder readFrom(StreamInput in) throws IOException {
676667
Index index = new Index(in);
677668
return readFromThin(in, index);

server/src/main/java/org/elasticsearch/cluster/routing/RoutingNodes.java

Lines changed: 1 addition & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -459,30 +459,6 @@ public Tuple<ShardRouting, ShardRouting> relocateShard(
459459
return Tuple.tuple(source, target);
460460
}
461461

462-
public void relocateOrReinitializeShard(
463-
ShardRouting startedShard,
464-
String nodeId,
465-
long expectedShardSize,
466-
RoutingChangesObserver changes
467-
) {
468-
if (startedShard.isSearchable() == false) {
469-
remove(startedShard);
470-
var unassignedInfo = new UnassignedInfo(UnassignedInfo.Reason.REINITIALIZED, "relocating unsearchable shard");
471-
var assignedShards = assignedShards(startedShard.shardId());
472-
var promotableShard = assignedShards.stream().filter(ShardRouting::isPromotableToPrimary).findAny();
473-
assert promotableShard.isEmpty() : "multiple promotable shards are not supported yet";
474-
// replicas needs to be removed as well as they could not be active when primary is unassigned
475-
// see org.elasticsearch.cluster.routing.IndexShardRoutingTable.Builder.noAssignedReplicaWithoutActivePrimary
476-
for (ShardRouting replica : List.copyOf(assignedShards)) {
477-
remove(replica);
478-
unassignedShards.ignoreShard(replica.moveToUnassigned(unassignedInfo), AllocationStatus.NO_ATTEMPT, changes);
479-
}
480-
initializeShard(startedShard.moveToUnassigned(unassignedInfo), nodeId, null, expectedShardSize, changes);
481-
} else {
482-
relocateShard(startedShard, nodeId, expectedShardSize, changes);
483-
}
484-
}
485-
486462
/**
487463
* Applies the relevant logic to start an initializing shard.
488464
*
@@ -533,7 +509,7 @@ public ShardRouting startShard(
533509
routing,
534510
new UnassignedInfo(UnassignedInfo.Reason.REINITIALIZED, "primary changed")
535511
);
536-
relocateOrReinitializeShard(
512+
relocateShard(
537513
startedReplica,
538514
sourceShard.relocatingNodeId(),
539515
sourceShard.getExpectedShardSize(),

server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/DesiredBalanceReconciler.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -378,7 +378,7 @@ private void moveShards() {
378378
final var moveTarget = findRelocationTarget(shardRouting, assignment.nodeIds());
379379
if (moveTarget != null) {
380380
logger.debug("Moving shard {} from {} to {}", shardRouting.shardId(), shardRouting.currentNodeId(), moveTarget.getId());
381-
routingNodes.relocateOrReinitializeShard(
381+
routingNodes.relocateShard(
382382
shardRouting,
383383
moveTarget.getId(),
384384
allocation.clusterInfo().getShardSize(shardRouting, ShardRouting.UNAVAILABLE_EXPECTED_SHARD_SIZE),
@@ -443,7 +443,7 @@ private void balance() {
443443
rebalanceTarget.getId()
444444
);
445445

446-
routingNodes.relocateOrReinitializeShard(
446+
routingNodes.relocateShard(
447447
shardRouting,
448448
rebalanceTarget.getId(),
449449
allocation.clusterInfo().getShardSize(shardRouting, ShardRouting.UNAVAILABLE_EXPECTED_SHARD_SIZE),

server/src/main/java/org/elasticsearch/cluster/routing/allocation/command/MoveAllocationCommand.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -168,7 +168,7 @@ public RerouteExplanation execute(RoutingAllocation allocation, boolean explain)
168168
// its being throttled, maybe have a flag to take it into account and fail? for now, just do it since the "user" wants it...
169169
}
170170
allocation.routingNodes()
171-
.relocateOrReinitializeShard(
171+
.relocateShard(
172172
shardRouting,
173173
toRoutingNode.nodeId(),
174174
allocation.clusterInfo().getShardSize(shardRouting, ShardRouting.UNAVAILABLE_EXPECTED_SHARD_SIZE),

server/src/main/java/org/elasticsearch/indices/recovery/PeerRecoveryTargetService.java

Lines changed: 35 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,8 +16,10 @@
1616
import org.elasticsearch.ElasticsearchTimeoutException;
1717
import org.elasticsearch.ExceptionsHelper;
1818
import org.elasticsearch.action.ActionListener;
19+
import org.elasticsearch.action.ActionResponse;
1920
import org.elasticsearch.action.ActionRunnable;
2021
import org.elasticsearch.action.support.ChannelActionListener;
22+
import org.elasticsearch.client.internal.Client;
2123
import org.elasticsearch.cluster.ClusterState;
2224
import org.elasticsearch.cluster.ClusterStateObserver;
2325
import org.elasticsearch.cluster.metadata.IndexMetadata;
@@ -90,6 +92,7 @@ public static class Actions {
9092
public static final String HANDOFF_PRIMARY_CONTEXT = "internal:index/shard/recovery/handoff_primary_context";
9193
}
9294

95+
private final Client client;
9396
private final ThreadPool threadPool;
9497

9598
private final TransportService transportService;
@@ -101,12 +104,14 @@ public static class Actions {
101104
private final RecoveriesCollection onGoingRecoveries;
102105

103106
public PeerRecoveryTargetService(
107+
Client client,
104108
ThreadPool threadPool,
105109
TransportService transportService,
106110
RecoverySettings recoverySettings,
107111
ClusterService clusterService,
108112
SnapshotFilesProvider snapshotFilesProvider
109113
) {
114+
this.client = client;
110115
this.threadPool = threadPool;
111116
this.transportService = transportService;
112117
this.recoverySettings = recoverySettings;
@@ -289,7 +294,7 @@ private void doRecovery(final long recoveryId, final StartRecoveryRequest preExi
289294
assert preExistingRequest == null;
290295
assert indexShard.indexSettings().getIndexMetadata().isSearchableSnapshot() == false;
291296
ActionListener.run(cleanupOnly.map(v -> {
292-
logger.trace("{} preparing shard for peer recovery", recoveryTarget.shardId());
297+
logger.trace("{} preparing unpromotable shard for recovery", recoveryTarget.shardId());
293298
indexShard.prepareForIndexRecovery();
294299
// Skip unnecessary intermediate stages
295300
recoveryState.setStage(RecoveryState.Stage.VERIFY_INDEX);
@@ -303,6 +308,35 @@ private void doRecovery(final long recoveryId, final StartRecoveryRequest preExi
303308
return;
304309
}
305310

311+
if (indexShard.routingEntry().isSearchable() == false && recoveryState.getPrimary()) {
312+
assert preExistingRequest == null;
313+
assert indexShard.indexSettings().getIndexMetadata().isSearchableSnapshot() == false;
314+
try (onCompletion) {
315+
client.execute(
316+
StatelessPrimaryRelocationAction.INSTANCE,
317+
new StatelessPrimaryRelocationAction.Request(
318+
recoveryId,
319+
indexShard.shardId(),
320+
transportService.getLocalNode(),
321+
indexShard.routingEntry().allocationId().getId()
322+
),
323+
new ActionListener<>() {
324+
@Override
325+
public void onResponse(ActionResponse.Empty ignored) {
326+
onGoingRecoveries.markRecoveryAsDone(recoveryId);
327+
}
328+
329+
@Override
330+
public void onFailure(Exception e) {
331+
// TODO retries? See RecoveryResponseHandler#handleException
332+
onGoingRecoveries.failRecovery(recoveryId, new RecoveryFailedException(recoveryState, null, e), true);
333+
}
334+
}
335+
);
336+
return;
337+
}
338+
}
339+
306340
record StartRecoveryRequestToSend(StartRecoveryRequest startRecoveryRequest, String actionName, TransportRequest requestToSend) {}
307341
final ActionListener<StartRecoveryRequestToSend> toSendListener = cleanupOnly.map(r -> {
308342
logger.trace(

server/src/main/java/org/elasticsearch/node/Node.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1088,6 +1088,7 @@ protected Node(
10881088
b.bind(PeerRecoveryTargetService.class)
10891089
.toInstance(
10901090
new PeerRecoveryTargetService(
1091+
client,
10911092
threadPool,
10921093
transportService,
10931094
recoverySettings,

server/src/test/java/org/elasticsearch/cluster/routing/allocation/RoutingNodesTests.java

Lines changed: 12 additions & 51 deletions
Original file line numberDiff line numberDiff line change
@@ -44,9 +44,8 @@
4444
import static org.elasticsearch.cluster.routing.ShardRoutingState.INITIALIZING;
4545
import static org.elasticsearch.cluster.routing.ShardRoutingState.RELOCATING;
4646
import static org.elasticsearch.cluster.routing.ShardRoutingState.STARTED;
47+
import static org.hamcrest.Matchers.empty;
4748
import static org.hamcrest.Matchers.equalTo;
48-
import static org.hamcrest.Matchers.hasSize;
49-
import static org.hamcrest.Matchers.nullValue;
5049
import static org.hamcrest.Matchers.oneOf;
5150

5251
public class RoutingNodesTests extends ESAllocationTestCase {
@@ -418,47 +417,14 @@ public void testNodeInterleavedShardIterator() {
418417
}
419418

420419
public void testMoveShardWithDefaultRole() {
421-
422-
var inSync = randomList(2, 2, UUIDs::randomBase64UUID);
423-
var indexMetadata = IndexMetadata.builder("index")
424-
.settings(indexSettings(Version.CURRENT, 1, 1))
425-
.putInSyncAllocationIds(0, Set.copyOf(inSync))
426-
.build();
427-
428-
var shardId = new ShardId(indexMetadata.getIndex(), 0);
429-
430-
var indexRoutingTable = IndexRoutingTable.builder(indexMetadata.getIndex())
431-
.addShard(TestShardRouting.newShardRouting(shardId, "node-1", null, true, STARTED, ShardRouting.Role.DEFAULT))
432-
.addShard(TestShardRouting.newShardRouting(shardId, "node-2", null, false, STARTED, ShardRouting.Role.DEFAULT))
433-
.build();
434-
435-
var node1 = newNode("node-1");
436-
var node2 = newNode("node-2");
437-
var node3 = newNode("node-3");
438-
439-
var clusterState = ClusterState.builder(ClusterName.DEFAULT)
440-
.metadata(Metadata.builder().put(indexMetadata, false).build())
441-
.nodes(DiscoveryNodes.builder().add(node1).add(node2).add(node3).build())
442-
.routingTable(RoutingTable.builder().add(indexRoutingTable).build())
443-
.build();
444-
445-
var routingNodes = clusterState.getRoutingNodes().mutableCopy();
446-
447-
routingNodes.relocateOrReinitializeShard(
448-
routingNodes.node("node-1").getByShardId(shardId),
449-
"node-3",
450-
0L,
451-
new RoutingChangesObserver() {
452-
}
453-
);
454-
455-
assertThat(routingNodes.node("node-1").getByShardId(shardId).state(), equalTo(RELOCATING));
456-
assertThat(routingNodes.node("node-2").getByShardId(shardId).state(), equalTo(STARTED));
457-
assertThat(routingNodes.node("node-3").getByShardId(shardId).state(), equalTo(INITIALIZING));
420+
runMoveShardRolesTest(ShardRouting.Role.DEFAULT, ShardRouting.Role.DEFAULT);
458421
}
459422

460423
public void testMoveShardWithPromotableOnlyRole() {
424+
runMoveShardRolesTest(ShardRouting.Role.INDEX_ONLY, ShardRouting.Role.SEARCH_ONLY);
425+
}
461426

427+
private void runMoveShardRolesTest(ShardRouting.Role primaryRole, ShardRouting.Role replicaRole) {
462428
var inSync = randomList(2, 2, UUIDs::randomBase64UUID);
463429
var indexMetadata = IndexMetadata.builder("index")
464430
.settings(indexSettings(Version.CURRENT, 1, 1))
@@ -468,8 +434,8 @@ public void testMoveShardWithPromotableOnlyRole() {
468434
var shardId = new ShardId(indexMetadata.getIndex(), 0);
469435

470436
var indexRoutingTable = IndexRoutingTable.builder(indexMetadata.getIndex())
471-
.addShard(TestShardRouting.newShardRouting(shardId, "node-1", null, true, STARTED, ShardRouting.Role.INDEX_ONLY))
472-
.addShard(TestShardRouting.newShardRouting(shardId, "node-2", null, false, STARTED, ShardRouting.Role.SEARCH_ONLY))
437+
.addShard(TestShardRouting.newShardRouting(shardId, "node-1", null, true, STARTED, primaryRole))
438+
.addShard(TestShardRouting.newShardRouting(shardId, "node-2", null, false, STARTED, replicaRole))
473439
.build();
474440

475441
var node1 = newNode("node-1");
@@ -484,18 +450,13 @@ public void testMoveShardWithPromotableOnlyRole() {
484450

485451
var routingNodes = clusterState.getRoutingNodes().mutableCopy();
486452

487-
routingNodes.relocateOrReinitializeShard(
488-
routingNodes.node("node-1").getByShardId(shardId),
489-
"node-3",
490-
0L,
491-
new RoutingChangesObserver() {
492-
}
493-
);
453+
routingNodes.relocateShard(routingNodes.node("node-1").getByShardId(shardId), "node-3", 0L, new RoutingChangesObserver() {
454+
});
494455

495-
assertThat(routingNodes.node("node-1").getByShardId(shardId), nullValue());
496-
assertThat(routingNodes.node("node-2").getByShardId(shardId), nullValue());
456+
assertThat(routingNodes.node("node-1").getByShardId(shardId).state(), equalTo(RELOCATING));
457+
assertThat(routingNodes.node("node-2").getByShardId(shardId).state(), equalTo(STARTED));
497458
assertThat(routingNodes.node("node-3").getByShardId(shardId).state(), equalTo(INITIALIZING));
498-
assertThat(routingNodes.unassigned().ignored(), hasSize(1));
459+
assertThat(routingNodes.unassigned().ignored(), empty());
499460
}
500461

501462
private boolean assertShardStats(RoutingNodes routingNodes) {

server/src/test/java/org/elasticsearch/indices/cluster/IndicesClusterStateServiceRandomUpdatesTests.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -532,7 +532,9 @@ private IndicesClusterStateService createIndicesClusterStateService(
532532
threadPool,
533533
List.of()
534534
);
535+
final NodeClient client = mock(NodeClient.class);
535536
final PeerRecoveryTargetService recoveryTargetService = new PeerRecoveryTargetService(
537+
client,
536538
threadPool,
537539
transportService,
538540
null,
@@ -541,7 +543,6 @@ private IndicesClusterStateService createIndicesClusterStateService(
541543
);
542544
final ShardStateAction shardStateAction = mock(ShardStateAction.class);
543545
final PrimaryReplicaSyncer primaryReplicaSyncer = mock(PrimaryReplicaSyncer.class);
544-
final NodeClient client = mock(NodeClient.class);
545546
return new IndicesClusterStateService(
546547
settings,
547548
indicesService,

server/src/test/java/org/elasticsearch/snapshots/SnapshotResiliencyTests.java

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1863,7 +1863,14 @@ protected void assertSnapshotOrGenericThread() {
18631863
indicesService,
18641864
clusterService,
18651865
threadPool,
1866-
new PeerRecoveryTargetService(threadPool, transportService, recoverySettings, clusterService, snapshotFilesProvider),
1866+
new PeerRecoveryTargetService(
1867+
client,
1868+
threadPool,
1869+
transportService,
1870+
recoverySettings,
1871+
clusterService,
1872+
snapshotFilesProvider
1873+
),
18671874
shardStateAction,
18681875
repositoriesService,
18691876
searchService,

0 commit comments

Comments
 (0)