Skip to content

Commit 339b0b3

Browse files
committed
IndicesClusterStateService should replace an init. replica with an init. primary with the same aId (#32374)
In rare cases it is possible that a nodes gets an instruction to replace a replica shard that's in `POST_RECOVERY` with a new initializing primary with the same allocation id. This can happen by batching cluster states that include the starting of the replica, with closing of the indices, opening it up again and allocating the primary shard to the node in question. The node should then clean it's initializing replica and replace it with a new initializing primary. I'm not sure whether the test I added really adds enough value as existing tests found this. The main reason I added is to allow for simpler reproduction and to double check I fixed it. I'm open to discuss if we should keep. Closes #32308
1 parent 7e1a1fe commit 339b0b3

File tree

4 files changed

+84
-8
lines changed

4 files changed

+84
-8
lines changed

server/src/main/java/org/elasticsearch/indices/cluster/IndicesClusterStateService.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -420,6 +420,12 @@ private void removeShards(final ClusterState state) {
420420
// state may result in a new shard being initialized while having the same allocation id as the currently started shard.
421421
logger.debug("{} removing shard (not active, current {}, new {})", shardId, currentRoutingEntry, newShardRouting);
422422
indexService.removeShard(shardId.id(), "removing shard (stale copy)");
423+
} else if (newShardRouting.primary() && currentRoutingEntry.primary() == false && newShardRouting.initializing()) {
424+
assert currentRoutingEntry.initializing() : currentRoutingEntry; // see above if clause
425+
// this can happen when cluster state batching batches activation of the shard, closing an index, reopening it
426+
// and assigning an initializing primary to this node
427+
logger.debug("{} removing shard (not active, current {}, new {})", shardId, currentRoutingEntry, newShardRouting);
428+
indexService.removeShard(shardId.id(), "removing shard (stale copy)");
423429
}
424430
}
425431
}

server/src/test/java/org/elasticsearch/action/support/replication/ClusterStateCreationUtils.java

Lines changed: 19 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -27,10 +27,12 @@
2727
import org.elasticsearch.cluster.metadata.MetaData;
2828
import org.elasticsearch.cluster.node.DiscoveryNode;
2929
import org.elasticsearch.cluster.node.DiscoveryNodes;
30+
import org.elasticsearch.cluster.routing.AllocationId;
3031
import org.elasticsearch.cluster.routing.IndexRoutingTable;
3132
import org.elasticsearch.cluster.routing.IndexShardRoutingTable;
3233
import org.elasticsearch.cluster.routing.RoutingTable;
3334
import org.elasticsearch.cluster.routing.RoutingTable.Builder;
35+
import org.elasticsearch.cluster.routing.ShardRouting;
3436
import org.elasticsearch.cluster.routing.ShardRoutingState;
3537
import org.elasticsearch.cluster.routing.TestShardRouting;
3638
import org.elasticsearch.cluster.routing.UnassignedInfo;
@@ -44,6 +46,7 @@
4446
import java.util.HashSet;
4547
import java.util.List;
4648
import java.util.Set;
49+
import java.util.stream.Collectors;
4750

4851
import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_CREATION_DATE;
4952
import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_NUMBER_OF_REPLICAS;
@@ -93,7 +96,8 @@ public static ClusterState state(String index, boolean activePrimaryLocal, Shard
9396
IndexMetaData indexMetaData = IndexMetaData.builder(index).settings(Settings.builder()
9497
.put(SETTING_VERSION_CREATED, Version.CURRENT)
9598
.put(SETTING_NUMBER_OF_SHARDS, 1).put(SETTING_NUMBER_OF_REPLICAS, numberOfReplicas)
96-
.put(SETTING_CREATION_DATE, System.currentTimeMillis())).primaryTerm(0, primaryTerm).build();
99+
.put(SETTING_CREATION_DATE, System.currentTimeMillis())).primaryTerm(0, primaryTerm)
100+
.build();
97101

98102
RoutingTable.Builder routing = new RoutingTable.Builder();
99103
routing.addAsNew(indexMetaData);
@@ -138,12 +142,19 @@ public static ClusterState state(String index, boolean activePrimaryLocal, Shard
138142
TestShardRouting.newShardRouting(index, shardId.id(), replicaNode, relocatingNode, false, replicaState,
139143
unassignedInfo));
140144
}
145+
final IndexShardRoutingTable indexShardRoutingTable = indexShardRoutingBuilder.build();
146+
147+
IndexMetaData.Builder indexMetaDataBuilder = new IndexMetaData.Builder(indexMetaData);
148+
indexMetaDataBuilder.putInSyncAllocationIds(0,
149+
indexShardRoutingTable.activeShards().stream().map(ShardRouting::allocationId).map(AllocationId::getId)
150+
.collect(Collectors.toSet())
151+
);
141152

142153
ClusterState.Builder state = ClusterState.builder(new ClusterName("test"));
143154
state.nodes(discoBuilder);
144-
state.metaData(MetaData.builder().put(indexMetaData, false).generateClusterUuidIfNeeded());
155+
state.metaData(MetaData.builder().put(indexMetaDataBuilder.build(), false).generateClusterUuidIfNeeded());
145156
state.routingTable(RoutingTable.builder().add(IndexRoutingTable.builder(indexMetaData.getIndex())
146-
.addIndexShard(indexShardRoutingBuilder.build())).build());
157+
.addIndexShard(indexShardRoutingTable)).build());
147158
return state.build();
148159
}
149160

@@ -272,21 +283,21 @@ public static ClusterState stateWithAssignedPrimariesAndOneReplica(String index,
272283
state.routingTable(RoutingTable.builder().add(indexRoutingTableBuilder.build()).build());
273284
return state.build();
274285
}
275-
276-
286+
287+
277288
/**
278289
* Creates cluster state with several indexes, shards and replicas and all shards STARTED.
279290
*/
280291
public static ClusterState stateWithAssignedPrimariesAndReplicas(String[] indices, int numberOfShards, int numberOfReplicas) {
281292

282-
int numberOfDataNodes = numberOfReplicas + 1;
293+
int numberOfDataNodes = numberOfReplicas + 1;
283294
DiscoveryNodes.Builder discoBuilder = DiscoveryNodes.builder();
284295
for (int i = 0; i < numberOfDataNodes + 1; i++) {
285296
final DiscoveryNode node = newNode(i);
286297
discoBuilder = discoBuilder.add(node);
287298
}
288299
discoBuilder.localNodeId(newNode(0).getId());
289-
discoBuilder.masterNodeId(newNode(numberOfDataNodes + 1).getId());
300+
discoBuilder.masterNodeId(newNode(numberOfDataNodes + 1).getId());
290301
ClusterState.Builder state = ClusterState.builder(new ClusterName("test"));
291302
state.nodes(discoBuilder);
292303
Builder routingTableBuilder = RoutingTable.builder();
@@ -316,7 +327,7 @@ public static ClusterState stateWithAssignedPrimariesAndReplicas(String[] indice
316327
state.metaData(metadataBuilder);
317328
state.routingTable(routingTableBuilder.build());
318329
return state.build();
319-
}
330+
}
320331

321332
/**
322333
* Creates cluster state with and index that has one shard and as many replicas as numberOfReplicas.

server/src/test/java/org/elasticsearch/indices/cluster/AbstractIndicesClusterStateServiceTestCase.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -74,6 +74,10 @@ public void injectRandomFailures() {
7474
enableRandomFailures = randomBoolean();
7575
}
7676

77+
protected void disableRandomFailures() {
78+
enableRandomFailures = false;
79+
}
80+
7781
protected void failRandomly() {
7882
if (enableRandomFailures && rarely()) {
7983
throw new RuntimeException("dummy test failure");

server/src/test/java/org/elasticsearch/indices/cluster/IndicesClusterStateServiceRandomUpdatesTests.java

Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,7 @@
4949
import org.elasticsearch.discovery.DiscoverySettings;
5050
import org.elasticsearch.index.Index;
5151
import org.elasticsearch.index.shard.PrimaryReplicaSyncer;
52+
import org.elasticsearch.index.shard.ShardId;
5253
import org.elasticsearch.indices.recovery.PeerRecoveryTargetService;
5354
import org.elasticsearch.repositories.RepositoriesService;
5455
import org.elasticsearch.threadpool.TestThreadPool;
@@ -75,6 +76,7 @@
7576
import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_NUMBER_OF_REPLICAS;
7677
import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_NUMBER_OF_SHARDS;
7778
import static org.elasticsearch.cluster.routing.ShardRoutingState.INITIALIZING;
79+
import static org.hamcrest.Matchers.equalTo;
7880
import static org.mockito.Mockito.mock;
7981
import static org.mockito.Mockito.when;
8082

@@ -198,6 +200,59 @@ public void testJoiningNewClusterOnlyRemovesInMemoryIndexStructures() {
198200
}
199201
}
200202

203+
/**
204+
* In rare cases it is possible that a nodes gets an instruction to replace a replica
205+
* shard that's in POST_RECOVERY with a new initializing primary with the same allocation id.
206+
* This can happen by batching cluster states that include the starting of the replica, with
207+
* closing of the indices, opening it up again and allocating the primary shard to the node in
208+
* question. The node should then clean it's initializing replica and replace it with a new
209+
* initializing primary.
210+
*/
211+
public void testInitializingPrimaryRemovesInitializingReplicaWithSameAID() {
212+
disableRandomFailures();
213+
String index = "index_" + randomAlphaOfLength(8).toLowerCase(Locale.ROOT);
214+
ClusterState state = ClusterStateCreationUtils.state(index, randomBoolean(),
215+
ShardRoutingState.STARTED, ShardRoutingState.INITIALIZING);
216+
217+
// the initial state which is derived from the newly created cluster state but doesn't contain the index
218+
ClusterState previousState = ClusterState.builder(state)
219+
.metaData(MetaData.builder(state.metaData()).remove(index))
220+
.routingTable(RoutingTable.builder().build())
221+
.build();
222+
223+
// pick a data node to simulate the adding an index cluster state change event on, that has shards assigned to it
224+
final ShardRouting shardRouting = state.routingTable().index(index).shard(0).replicaShards().get(0);
225+
final ShardId shardId = shardRouting.shardId();
226+
DiscoveryNode node = state.nodes().get(shardRouting.currentNodeId());
227+
228+
// simulate the cluster state change on the node
229+
ClusterState localState = adaptClusterStateToLocalNode(state, node);
230+
ClusterState previousLocalState = adaptClusterStateToLocalNode(previousState, node);
231+
IndicesClusterStateService indicesCSSvc = createIndicesClusterStateService(node, RecordingIndicesService::new);
232+
indicesCSSvc.start();
233+
indicesCSSvc.applyClusterState(new ClusterChangedEvent("cluster state change that adds the index", localState, previousLocalState));
234+
previousState = state;
235+
236+
// start the replica
237+
state = cluster.applyStartedShards(state, state.routingTable().index(index).shard(0).replicaShards());
238+
239+
// close the index and open it up again (this will sometimes swap roles between primary and replica)
240+
CloseIndexRequest closeIndexRequest = new CloseIndexRequest(state.metaData().index(index).getIndex().getName());
241+
state = cluster.closeIndices(state, closeIndexRequest);
242+
OpenIndexRequest openIndexRequest = new OpenIndexRequest(state.metaData().index(index).getIndex().getName());
243+
state = cluster.openIndices(state, openIndexRequest);
244+
245+
localState = adaptClusterStateToLocalNode(state, node);
246+
previousLocalState = adaptClusterStateToLocalNode(previousState, node);
247+
248+
indicesCSSvc.applyClusterState(new ClusterChangedEvent("new cluster state", localState, previousLocalState));
249+
250+
final MockIndexShard shardOrNull = ((RecordingIndicesService) indicesCSSvc.indicesService).getShardOrNull(shardId);
251+
assertThat(shardOrNull == null ? null : shardOrNull.routingEntry(),
252+
equalTo(state.getRoutingNodes().node(node.getId()).getByShardId(shardId)));
253+
254+
}
255+
201256
public ClusterState randomInitialClusterState(Map<DiscoveryNode, IndicesClusterStateService> clusterStateServiceMap,
202257
Supplier<MockIndicesService> indicesServiceSupplier) {
203258
List<DiscoveryNode> allNodes = new ArrayList<>();

0 commit comments

Comments
 (0)