Skip to content

Stateless primary relocations #97162

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -613,7 +613,6 @@ public IndexShardRoutingTable build() {
assert distinctNodes(shards) : "more than one shard with same id assigned to same node (shards: " + shards + ")";
assert noDuplicatePrimary(shards) : "expected but did not find unique primary in shard routing table: " + shards;
assert noAssignedReplicaWithoutActivePrimary(shards) : "unexpected assigned replica with no active primary: " + shards;
assert noRelocatingUnsearchableShards(shards) : "unexpected RELOCATING unsearchable shard: " + shards;
return new IndexShardRoutingTable(shardId, shards);
}

Expand Down Expand Up @@ -664,14 +663,6 @@ static boolean noAssignedReplicaWithoutActivePrimary(List<ShardRouting> shards)
return seenAssignedReplica == false;
}

static boolean noRelocatingUnsearchableShards(List<ShardRouting> shards) {
// this is unsupported until ES-4677 is implemented
for (var shard : shards) {
assert shard.role().isSearchable() || shard.relocating() == false : "unexpected RELOCATING unsearchable shard: " + shard;
}
return true;
}

public static IndexShardRoutingTable.Builder readFrom(StreamInput in) throws IOException {
Index index = new Index(in);
return readFromThin(in, index);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -459,30 +459,6 @@ public Tuple<ShardRouting, ShardRouting> relocateShard(
return Tuple.tuple(source, target);
}

public void relocateOrReinitializeShard(
ShardRouting startedShard,
String nodeId,
long expectedShardSize,
RoutingChangesObserver changes
) {
if (startedShard.isSearchable() == false) {
remove(startedShard);
var unassignedInfo = new UnassignedInfo(UnassignedInfo.Reason.REINITIALIZED, "relocating unsearchable shard");
var assignedShards = assignedShards(startedShard.shardId());
var promotableShard = assignedShards.stream().filter(ShardRouting::isPromotableToPrimary).findAny();
assert promotableShard.isEmpty() : "multiple promotable shards are not supported yet";
// replicas needs to be removed as well as they could not be active when primary is unassigned
// see org.elasticsearch.cluster.routing.IndexShardRoutingTable.Builder.noAssignedReplicaWithoutActivePrimary
for (ShardRouting replica : List.copyOf(assignedShards)) {
remove(replica);
unassignedShards.ignoreShard(replica.moveToUnassigned(unassignedInfo), AllocationStatus.NO_ATTEMPT, changes);
}
initializeShard(startedShard.moveToUnassigned(unassignedInfo), nodeId, null, expectedShardSize, changes);
} else {
relocateShard(startedShard, nodeId, expectedShardSize, changes);
}
}

/**
* Applies the relevant logic to start an initializing shard.
*
Expand Down Expand Up @@ -533,7 +509,7 @@ public ShardRouting startShard(
routing,
new UnassignedInfo(UnassignedInfo.Reason.REINITIALIZED, "primary changed")
);
relocateOrReinitializeShard(
relocateShard(
startedReplica,
sourceShard.relocatingNodeId(),
sourceShard.getExpectedShardSize(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -378,7 +378,7 @@ private void moveShards() {
final var moveTarget = findRelocationTarget(shardRouting, assignment.nodeIds());
if (moveTarget != null) {
logger.debug("Moving shard {} from {} to {}", shardRouting.shardId(), shardRouting.currentNodeId(), moveTarget.getId());
routingNodes.relocateOrReinitializeShard(
routingNodes.relocateShard(
shardRouting,
moveTarget.getId(),
allocation.clusterInfo().getShardSize(shardRouting, ShardRouting.UNAVAILABLE_EXPECTED_SHARD_SIZE),
Expand Down Expand Up @@ -443,7 +443,7 @@ private void balance() {
rebalanceTarget.getId()
);

routingNodes.relocateOrReinitializeShard(
routingNodes.relocateShard(
shardRouting,
rebalanceTarget.getId(),
allocation.clusterInfo().getShardSize(shardRouting, ShardRouting.UNAVAILABLE_EXPECTED_SHARD_SIZE),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,7 @@ public RerouteExplanation execute(RoutingAllocation allocation, boolean explain)
// its being throttled, maybe have a flag to take it into account and fail? for now, just do it since the "user" wants it...
}
allocation.routingNodes()
.relocateOrReinitializeShard(
.relocateShard(
shardRouting,
toRoutingNode.nodeId(),
allocation.clusterInfo().getShardSize(shardRouting, ShardRouting.UNAVAILABLE_EXPECTED_SHARD_SIZE),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,10 @@
import org.elasticsearch.ElasticsearchTimeoutException;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.action.ActionRunnable;
import org.elasticsearch.action.support.ChannelActionListener;
import org.elasticsearch.client.internal.Client;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterStateObserver;
import org.elasticsearch.cluster.metadata.IndexMetadata;
Expand Down Expand Up @@ -90,6 +92,7 @@ public static class Actions {
public static final String HANDOFF_PRIMARY_CONTEXT = "internal:index/shard/recovery/handoff_primary_context";
}

private final Client client;
private final ThreadPool threadPool;

private final TransportService transportService;
Expand All @@ -101,12 +104,14 @@ public static class Actions {
private final RecoveriesCollection onGoingRecoveries;

public PeerRecoveryTargetService(
Client client,
ThreadPool threadPool,
TransportService transportService,
RecoverySettings recoverySettings,
ClusterService clusterService,
SnapshotFilesProvider snapshotFilesProvider
) {
this.client = client;
this.threadPool = threadPool;
this.transportService = transportService;
this.recoverySettings = recoverySettings;
Expand Down Expand Up @@ -289,7 +294,7 @@ private void doRecovery(final long recoveryId, final StartRecoveryRequest preExi
assert preExistingRequest == null;
assert indexShard.indexSettings().getIndexMetadata().isSearchableSnapshot() == false;
ActionListener.run(cleanupOnly.map(v -> {
logger.trace("{} preparing shard for peer recovery", recoveryTarget.shardId());
logger.trace("{} preparing unpromotable shard for recovery", recoveryTarget.shardId());
indexShard.prepareForIndexRecovery();
// Skip unnecessary intermediate stages
recoveryState.setStage(RecoveryState.Stage.VERIFY_INDEX);
Expand All @@ -303,6 +308,35 @@ private void doRecovery(final long recoveryId, final StartRecoveryRequest preExi
return;
}

if (indexShard.routingEntry().isSearchable() == false && recoveryState.getPrimary()) {
assert preExistingRequest == null;
assert indexShard.indexSettings().getIndexMetadata().isSearchableSnapshot() == false;
try (onCompletion) {
client.execute(
StatelessPrimaryRelocationAction.INSTANCE,
new StatelessPrimaryRelocationAction.Request(
recoveryId,
indexShard.shardId(),
transportService.getLocalNode(),
indexShard.routingEntry().allocationId().getId()
),
new ActionListener<>() {
@Override
public void onResponse(ActionResponse.Empty ignored) {
onGoingRecoveries.markRecoveryAsDone(recoveryId);
}

@Override
public void onFailure(Exception e) {
// TODO retries? See RecoveryResponseHandler#handleException
onGoingRecoveries.failRecovery(recoveryId, new RecoveryFailedException(recoveryState, null, e), true);
}
}
);
return;
}
}

record StartRecoveryRequestToSend(StartRecoveryRequest startRecoveryRequest, String actionName, TransportRequest requestToSend) {}
final ActionListener<StartRecoveryRequestToSend> toSendListener = cleanupOnly.map(r -> {
logger.trace(
Expand Down
1 change: 1 addition & 0 deletions server/src/main/java/org/elasticsearch/node/Node.java
Original file line number Diff line number Diff line change
Expand Up @@ -1088,6 +1088,7 @@ protected Node(
b.bind(PeerRecoveryTargetService.class)
.toInstance(
new PeerRecoveryTargetService(
client,
threadPool,
transportService,
recoverySettings,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,9 +44,8 @@
import static org.elasticsearch.cluster.routing.ShardRoutingState.INITIALIZING;
import static org.elasticsearch.cluster.routing.ShardRoutingState.RELOCATING;
import static org.elasticsearch.cluster.routing.ShardRoutingState.STARTED;
import static org.hamcrest.Matchers.empty;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.hasSize;
import static org.hamcrest.Matchers.nullValue;
import static org.hamcrest.Matchers.oneOf;

public class RoutingNodesTests extends ESAllocationTestCase {
Expand Down Expand Up @@ -418,47 +417,14 @@ public void testNodeInterleavedShardIterator() {
}

public void testMoveShardWithDefaultRole() {

var inSync = randomList(2, 2, UUIDs::randomBase64UUID);
var indexMetadata = IndexMetadata.builder("index")
.settings(indexSettings(Version.CURRENT, 1, 1))
.putInSyncAllocationIds(0, Set.copyOf(inSync))
.build();

var shardId = new ShardId(indexMetadata.getIndex(), 0);

var indexRoutingTable = IndexRoutingTable.builder(indexMetadata.getIndex())
.addShard(TestShardRouting.newShardRouting(shardId, "node-1", null, true, STARTED, ShardRouting.Role.DEFAULT))
.addShard(TestShardRouting.newShardRouting(shardId, "node-2", null, false, STARTED, ShardRouting.Role.DEFAULT))
.build();

var node1 = newNode("node-1");
var node2 = newNode("node-2");
var node3 = newNode("node-3");

var clusterState = ClusterState.builder(ClusterName.DEFAULT)
.metadata(Metadata.builder().put(indexMetadata, false).build())
.nodes(DiscoveryNodes.builder().add(node1).add(node2).add(node3).build())
.routingTable(RoutingTable.builder().add(indexRoutingTable).build())
.build();

var routingNodes = clusterState.getRoutingNodes().mutableCopy();

routingNodes.relocateOrReinitializeShard(
routingNodes.node("node-1").getByShardId(shardId),
"node-3",
0L,
new RoutingChangesObserver() {
}
);

assertThat(routingNodes.node("node-1").getByShardId(shardId).state(), equalTo(RELOCATING));
assertThat(routingNodes.node("node-2").getByShardId(shardId).state(), equalTo(STARTED));
assertThat(routingNodes.node("node-3").getByShardId(shardId).state(), equalTo(INITIALIZING));
runMoveShardRolesTest(ShardRouting.Role.DEFAULT, ShardRouting.Role.DEFAULT);
}

public void testMoveShardWithPromotableOnlyRole() {
runMoveShardRolesTest(ShardRouting.Role.INDEX_ONLY, ShardRouting.Role.SEARCH_ONLY);
}

private void runMoveShardRolesTest(ShardRouting.Role primaryRole, ShardRouting.Role replicaRole) {
var inSync = randomList(2, 2, UUIDs::randomBase64UUID);
var indexMetadata = IndexMetadata.builder("index")
.settings(indexSettings(Version.CURRENT, 1, 1))
Expand All @@ -468,8 +434,8 @@ public void testMoveShardWithPromotableOnlyRole() {
var shardId = new ShardId(indexMetadata.getIndex(), 0);

var indexRoutingTable = IndexRoutingTable.builder(indexMetadata.getIndex())
.addShard(TestShardRouting.newShardRouting(shardId, "node-1", null, true, STARTED, ShardRouting.Role.INDEX_ONLY))
.addShard(TestShardRouting.newShardRouting(shardId, "node-2", null, false, STARTED, ShardRouting.Role.SEARCH_ONLY))
.addShard(TestShardRouting.newShardRouting(shardId, "node-1", null, true, STARTED, primaryRole))
.addShard(TestShardRouting.newShardRouting(shardId, "node-2", null, false, STARTED, replicaRole))
.build();

var node1 = newNode("node-1");
Expand All @@ -484,18 +450,13 @@ public void testMoveShardWithPromotableOnlyRole() {

var routingNodes = clusterState.getRoutingNodes().mutableCopy();

routingNodes.relocateOrReinitializeShard(
routingNodes.node("node-1").getByShardId(shardId),
"node-3",
0L,
new RoutingChangesObserver() {
}
);
routingNodes.relocateShard(routingNodes.node("node-1").getByShardId(shardId), "node-3", 0L, new RoutingChangesObserver() {
});

assertThat(routingNodes.node("node-1").getByShardId(shardId), nullValue());
assertThat(routingNodes.node("node-2").getByShardId(shardId), nullValue());
assertThat(routingNodes.node("node-1").getByShardId(shardId).state(), equalTo(RELOCATING));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the only part that I had not seen before in your changes. I see we're asserting an intermediate state here where the relocation has not fully completed yet (it's in transit from node-1 to node-3). Can we also wait for the completion of the relocation here and assert that it's finally only in node-3?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah we hadn't run these tests before, only the serverless ones, so I expect there might be a few changes needed.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is no actual relocation in progress here, this is only to verify that the routing table changes as we expect. This test (and the one above) were added to support the temporary change that triggered stateless relocations by just closing all the shards, so now really this test can say that the routing table behaves the same for stateless and regular relocations. I've removed the duplication in d4ca54f.

assertThat(routingNodes.node("node-2").getByShardId(shardId).state(), equalTo(STARTED));
assertThat(routingNodes.node("node-3").getByShardId(shardId).state(), equalTo(INITIALIZING));
assertThat(routingNodes.unassigned().ignored(), hasSize(1));
assertThat(routingNodes.unassigned().ignored(), empty());
}

private boolean assertShardStats(RoutingNodes routingNodes) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -532,7 +532,9 @@ private IndicesClusterStateService createIndicesClusterStateService(
threadPool,
List.of()
);
final NodeClient client = mock(NodeClient.class);
final PeerRecoveryTargetService recoveryTargetService = new PeerRecoveryTargetService(
client,
threadPool,
transportService,
null,
Expand All @@ -541,7 +543,6 @@ private IndicesClusterStateService createIndicesClusterStateService(
);
final ShardStateAction shardStateAction = mock(ShardStateAction.class);
final PrimaryReplicaSyncer primaryReplicaSyncer = mock(PrimaryReplicaSyncer.class);
final NodeClient client = mock(NodeClient.class);
return new IndicesClusterStateService(
settings,
indicesService,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1863,7 +1863,14 @@ protected void assertSnapshotOrGenericThread() {
indicesService,
clusterService,
threadPool,
new PeerRecoveryTargetService(threadPool, transportService, recoverySettings, clusterService, snapshotFilesProvider),
new PeerRecoveryTargetService(
client,
threadPool,
transportService,
recoverySettings,
clusterService,
snapshotFilesProvider
),
shardStateAction,
repositoriesService,
searchService,
Expand Down