Skip to content

Commit 5856c39

Browse files
authored
A replica can be promoted and started in one cluster state update (#32042)
When a replica is fully recovered (i.e., in `POST_RECOVERY` state) we send a request to the master to start the shard. The master changes the state of the replica and publishes a cluster state to that effect. In certain cases, that cluster state can be processed on the node hosting the replica *together* with a cluster state that promotes that, now started, replica to a primary. This can happen due to cluster state batched processing or if the master died after having committed the cluster state that starts the shard but before publishing it to the node with the replica. If the master also held the primary shard, the new master node will remove the primary (as it failed) and will also immediately promote the replica (thinking it is started). Sadly our code in IndexShard didn't allow for this which caused [assertions](https://github.com/elastic/elasticsearch/blob/13917162ad5c59a96ccb4d6a81a5044546c45c22/server/src/main/java/org/elasticsearch/index/seqno/ReplicationTracker.java#L482) to be tripped in some of our tests runs.
1 parent ef5e8d8 commit 5856c39

File tree

8 files changed

+126
-73
lines changed

8 files changed

+126
-73
lines changed

server/src/main/java/org/elasticsearch/index/shard/IndexShard.java

+14-5
Original file line numberDiff line numberDiff line change
@@ -413,10 +413,9 @@ public void updateShardState(final ShardRouting newRouting,
413413

414414
if (state == IndexShardState.POST_RECOVERY && newRouting.active()) {
415415
assert currentRouting.active() == false : "we are in POST_RECOVERY, but our shard routing is active " + currentRouting;
416-
417-
if (newRouting.primary() && currentRouting.isRelocationTarget() == false) {
418-
replicationTracker.activatePrimaryMode(getLocalCheckpoint());
419-
}
416+
assert currentRouting.isRelocationTarget() == false || currentRouting.primary() == false ||
417+
replicationTracker.isPrimaryMode() :
418+
"a primary relocation is completed by the master, but primary mode is not active " + currentRouting;
420419

421420
changeState(IndexShardState.STARTED, "global state is [" + newRouting.state() + "]");
422421
} else if (currentRouting.primary() && currentRouting.relocating() && replicationTracker.isPrimaryMode() == false &&
@@ -432,7 +431,12 @@ public void updateShardState(final ShardRouting newRouting,
432431
final CountDownLatch shardStateUpdated = new CountDownLatch(1);
433432

434433
if (newRouting.primary()) {
435-
if (newPrimaryTerm != primaryTerm) {
434+
if (newPrimaryTerm == primaryTerm) {
435+
if (currentRouting.initializing() && currentRouting.isRelocationTarget() == false && newRouting.active()) {
436+
// the master started a recovering primary, activate primary mode.
437+
replicationTracker.activatePrimaryMode(getLocalCheckpoint());
438+
}
439+
} else {
436440
assert currentRouting.primary() == false : "term is only increased as part of primary promotion";
437441
/* Note that due to cluster state batching an initializing primary shard term can failed and re-assigned
438442
* in one state causing it's term to be incremented. Note that if both current shard state and new
@@ -521,6 +525,11 @@ public void onFailure(Exception e) {
521525
}
522526
// set this last, once we finished updating all internal state.
523527
this.shardRouting = newRouting;
528+
529+
assert this.shardRouting.primary() == false ||
530+
this.shardRouting.started() == false || // note that we use started and not active to avoid relocating shards
531+
this.replicationTracker.isPrimaryMode()
532+
: "an started primary must be in primary mode " + this.shardRouting;
524533
shardStateUpdated.countDown();
525534
}
526535
if (currentRouting != null && currentRouting.active() == false && newRouting.active()) {

server/src/test/java/org/elasticsearch/index/replication/IndexLevelReplicationTests.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -363,7 +363,7 @@ public void testSeqNoCollision() throws Exception {
363363
logger.info("--> Promote replica2 as the primary");
364364
shards.promoteReplicaToPrimary(replica2);
365365
logger.info("--> Recover replica3 from replica2");
366-
recoverReplica(replica3, replica2);
366+
recoverReplica(replica3, replica2, true);
367367
try (Translog.Snapshot snapshot = getTranslog(replica3).newSnapshot()) {
368368
assertThat(snapshot.totalOperations(), equalTo(initDocs + 1));
369369
assertThat(snapshot.next(), equalTo(op2));

server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java

+22-45
Original file line numberDiff line numberDiff line change
@@ -226,6 +226,7 @@ public void testPersistenceStateMetadataPersistence() throws Exception {
226226
}
227227

228228
public void testFailShard() throws Exception {
229+
allowShardFailures();
229230
IndexShard shard = newStartedShard();
230231
final ShardPath shardPath = shard.shardPath();
231232
assertNotNull(shardPath);
@@ -309,7 +310,8 @@ public void testRejectOperationPermitWithHigherTermWhenNotStarted() throws IOExc
309310
}
310311

311312
public void testPrimaryPromotionDelaysOperations() throws IOException, BrokenBarrierException, InterruptedException {
312-
final IndexShard indexShard = newStartedShard(false);
313+
final IndexShard indexShard = newShard(false);
314+
recoveryEmptyReplica(indexShard, randomBoolean());
313315

314316
final int operations = scaledRandomIntBetween(1, 64);
315317
final CyclicBarrier barrier = new CyclicBarrier(1 + operations);
@@ -353,20 +355,10 @@ public void onFailure(Exception e) {
353355
barrier.await();
354356
latch.await();
355357

356-
// promote the replica
357358
final ShardRouting replicaRouting = indexShard.routingEntry();
358-
final ShardRouting primaryRouting =
359-
newShardRouting(
360-
replicaRouting.shardId(),
361-
replicaRouting.currentNodeId(),
362-
null,
363-
true,
364-
ShardRoutingState.STARTED,
365-
replicaRouting.allocationId());
366-
indexShard.updateShardState(primaryRouting, indexShard.getPrimaryTerm() + 1, (shard, listener) -> {},
367-
0L, Collections.singleton(primaryRouting.allocationId().getId()),
368-
new IndexShardRoutingTable.Builder(primaryRouting.shardId()).addShard(primaryRouting).build(),
369-
Collections.emptySet());
359+
promoteReplica(indexShard, Collections.singleton(replicaRouting.allocationId().getId()),
360+
new IndexShardRoutingTable.Builder(replicaRouting.shardId()).addShard(replicaRouting).build());
361+
370362

371363
final int delayedOperations = scaledRandomIntBetween(1, 64);
372364
final CyclicBarrier delayedOperationsBarrier = new CyclicBarrier(1 + delayedOperations);
@@ -428,8 +420,9 @@ public void onFailure(Exception e) {
428420
* 1) Internal state (ala ReplicationTracker) have been updated
429421
* 2) Primary term is set to the new term
430422
*/
431-
public void testPublishingOrderOnPromotion() throws IOException, BrokenBarrierException, InterruptedException {
432-
final IndexShard indexShard = newStartedShard(false);
423+
public void testPublishingOrderOnPromotion() throws IOException, InterruptedException, BrokenBarrierException {
424+
final IndexShard indexShard = newShard(false);
425+
recoveryEmptyReplica(indexShard, randomBoolean());
433426
final long promotedTerm = indexShard.getPrimaryTerm() + 1;
434427
final CyclicBarrier barrier = new CyclicBarrier(2);
435428
final AtomicBoolean stop = new AtomicBoolean();
@@ -448,18 +441,10 @@ public void testPublishingOrderOnPromotion() throws IOException, BrokenBarrierEx
448441
});
449442
thread.start();
450443

451-
final ShardRouting replicaRouting = indexShard.routingEntry();
452-
final ShardRouting primaryRouting = newShardRouting(replicaRouting.shardId(), replicaRouting.currentNodeId(), null, true,
453-
ShardRoutingState.STARTED, replicaRouting.allocationId());
454-
455-
456-
final Set<String> inSyncAllocationIds = Collections.singleton(primaryRouting.allocationId().getId());
457-
final IndexShardRoutingTable routingTable =
458-
new IndexShardRoutingTable.Builder(primaryRouting.shardId()).addShard(primaryRouting).build();
459444
barrier.await();
460-
// promote the replica
461-
indexShard.updateShardState(primaryRouting, promotedTerm, (shard, listener) -> {}, 0L, inSyncAllocationIds, routingTable,
462-
Collections.emptySet());
445+
final ShardRouting replicaRouting = indexShard.routingEntry();
446+
promoteReplica(indexShard, Collections.singleton(replicaRouting.allocationId().getId()),
447+
new IndexShardRoutingTable.Builder(replicaRouting.shardId()).addShard(replicaRouting).build());
463448

464449
stop.set(true);
465450
thread.join();
@@ -468,7 +453,8 @@ public void testPublishingOrderOnPromotion() throws IOException, BrokenBarrierEx
468453

469454

470455
public void testPrimaryFillsSeqNoGapsOnPromotion() throws Exception {
471-
final IndexShard indexShard = newStartedShard(false);
456+
final IndexShard indexShard = newShard(false);
457+
recoveryEmptyReplica(indexShard, randomBoolean());
472458

473459
// most of the time this is large enough that most of the time there will be at least one gap
474460
final int operations = 1024 - scaledRandomIntBetween(0, 1024);
@@ -479,17 +465,8 @@ public void testPrimaryFillsSeqNoGapsOnPromotion() throws Exception {
479465

480466
// promote the replica
481467
final ShardRouting replicaRouting = indexShard.routingEntry();
482-
final ShardRouting primaryRouting =
483-
newShardRouting(
484-
replicaRouting.shardId(),
485-
replicaRouting.currentNodeId(),
486-
null,
487-
true,
488-
ShardRoutingState.STARTED,
489-
replicaRouting.allocationId());
490-
indexShard.updateShardState(primaryRouting, indexShard.getPrimaryTerm() + 1, (shard, listener) -> {},
491-
0L, Collections.singleton(primaryRouting.allocationId().getId()),
492-
new IndexShardRoutingTable.Builder(primaryRouting.shardId()).addShard(primaryRouting).build(), Collections.emptySet());
468+
promoteReplica(indexShard, Collections.singleton(replicaRouting.allocationId().getId()),
469+
new IndexShardRoutingTable.Builder(replicaRouting.shardId()).addShard(replicaRouting).build());
493470

494471
/*
495472
* This operation completing means that the delay operation executed as part of increasing the primary term has completed and the
@@ -506,7 +483,7 @@ public void onResponse(Releasable releasable) {
506483

507484
@Override
508485
public void onFailure(Exception e) {
509-
throw new RuntimeException(e);
486+
throw new AssertionError(e);
510487
}
511488
},
512489
ThreadPool.Names.GENERIC, "");
@@ -846,7 +823,7 @@ public void testGlobalCheckpointSync() throws IOException {
846823
// add a replica
847824
recoverShardFromStore(primaryShard);
848825
final IndexShard replicaShard = newShard(shardId, false);
849-
recoverReplica(replicaShard, primaryShard);
826+
recoverReplica(replicaShard, primaryShard, true);
850827
final int maxSeqNo = randomIntBetween(0, 128);
851828
for (int i = 0; i <= maxSeqNo; i++) {
852829
EngineTestCase.generateNewSeqNo(primaryShard.getEngine());
@@ -1625,7 +1602,7 @@ public void testPrimaryHandOffUpdatesLocalCheckpoint() throws IOException {
16251602
IndexShardTestCase.updateRoutingEntry(primarySource, primarySource.routingEntry().relocate(randomAlphaOfLength(10), -1));
16261603
final IndexShard primaryTarget = newShard(primarySource.routingEntry().getTargetRelocatingShard());
16271604
updateMappings(primaryTarget, primarySource.indexSettings().getIndexMetaData());
1628-
recoverReplica(primaryTarget, primarySource);
1605+
recoverReplica(primaryTarget, primarySource, true);
16291606

16301607
// check that local checkpoint of new primary is properly tracked after primary relocation
16311608
assertThat(primaryTarget.getLocalCheckpoint(), equalTo(totalOps - 1L));
@@ -2082,7 +2059,7 @@ public long indexTranslogOperations(List<Translog.Operation> operations, int tot
20822059
assertFalse(replica.isSyncNeeded());
20832060
return localCheckpoint;
20842061
}
2085-
}, true);
2062+
}, true, true);
20862063

20872064
closeShards(primary, replica);
20882065
}
@@ -2189,7 +2166,7 @@ public long indexTranslogOperations(List<Translog.Operation> operations, int tot
21892166
assertTrue(replica.isActive());
21902167
return localCheckpoint;
21912168
}
2192-
}, false);
2169+
}, false, true);
21932170

21942171
closeShards(primary, replica);
21952172
}
@@ -2241,7 +2218,7 @@ public void finalizeRecovery(long globalCheckpoint) throws IOException {
22412218
super.finalizeRecovery(globalCheckpoint);
22422219
assertListenerCalled.accept(replica);
22432220
}
2244-
}, false);
2221+
}, false, true);
22452222

22462223
closeShards(primary, replica);
22472224
}

server/src/test/java/org/elasticsearch/indices/cluster/AbstractIndicesClusterStateServiceTestCase.java

+8
Original file line numberDiff line numberDiff line change
@@ -357,6 +357,14 @@ public void updateShardState(ShardRouting shardRouting,
357357
assertTrue("and active shard must stay active, current: " + this.shardRouting + ", got: " + shardRouting,
358358
shardRouting.active());
359359
}
360+
if (this.shardRouting.primary()) {
361+
assertTrue("a primary shard can't be demoted", shardRouting.primary());
362+
} else if (shardRouting.primary()) {
363+
// note: it's ok for a replica in post recovery to be started and promoted at once
364+
// this can happen when the primary failed after we sent the start shard message
365+
assertTrue("a replica can only be promoted when active. current: " + this.shardRouting + " new: " + shardRouting,
366+
shardRouting.active());
367+
}
360368
this.shardRouting = shardRouting;
361369
if (shardRouting.primary()) {
362370
term = newPrimaryTerm;

server/src/test/java/org/elasticsearch/indices/recovery/PeerRecoveryTargetServiceTests.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@ public void testGetStartingSeqNo() throws Exception {
4343
try {
4444
// Empty store
4545
{
46-
recoveryEmptyReplica(replica);
46+
recoveryEmptyReplica(replica, true);
4747
final RecoveryTarget recoveryTarget = new RecoveryTarget(replica, null, null, null);
4848
assertThat(PeerRecoveryTargetService.getStartingSeqNo(logger, recoveryTarget), equalTo(0L));
4949
recoveryTarget.decRef();

server/src/test/java/org/elasticsearch/indices/recovery/RecoveryTests.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -261,7 +261,7 @@ public void testPeerRecoverySendSafeCommitInFileBased() throws Exception {
261261
}
262262
IndexShard replicaShard = newShard(primaryShard.shardId(), false);
263263
updateMappings(replicaShard, primaryShard.indexSettings().getIndexMetaData());
264-
recoverReplica(replicaShard, primaryShard);
264+
recoverReplica(replicaShard, primaryShard, true);
265265
List<IndexCommit> commits = DirectoryReader.listCommits(replicaShard.store().directory());
266266
long maxSeqNo = Long.parseLong(commits.get(0).getUserData().get(SequenceNumbers.MAX_SEQ_NO));
267267
assertThat(maxSeqNo, lessThanOrEqualTo(globalCheckpoint));

test/framework/src/main/java/org/elasticsearch/index/replication/ESIndexLevelReplicationTestCase.java

+6-3
Original file line numberDiff line numberDiff line change
@@ -265,7 +265,7 @@ public synchronized IndexShard addReplicaWithExistingPath(final ShardPath shardP
265265
RecoverySource.PeerRecoverySource.INSTANCE);
266266

267267
final IndexShard newReplica =
268-
newShard(shardRouting, shardPath, indexMetaData, null, getEngineFactory(shardRouting), () -> {}, EMPTY_EVENT_LISTENER);
268+
newShard(shardRouting, shardPath, indexMetaData, null, getEngineFactory(shardRouting), () -> {}, EMPTY_EVENT_LISTENER);
269269
replicas.add(newReplica);
270270
updateAllocationIDsOnPrimary();
271271
return newReplica;
@@ -341,8 +341,11 @@ public void recoverReplica(
341341
IndexShard replica,
342342
BiFunction<IndexShard, DiscoveryNode, RecoveryTarget> targetSupplier,
343343
boolean markAsRecovering) throws IOException {
344-
ESIndexLevelReplicationTestCase.this.recoverReplica(replica, primary, targetSupplier, markAsRecovering, activeIds(),
345-
routingTable(Function.identity()));
344+
final IndexShardRoutingTable routingTable = routingTable(Function.identity());
345+
final Set<String> inSyncIds = activeIds();
346+
ESIndexLevelReplicationTestCase.this.recoverUnstartedReplica(replica, primary, targetSupplier, markAsRecovering, inSyncIds,
347+
routingTable);
348+
ESIndexLevelReplicationTestCase.this.startReplicaAfterRecovery(replica, primary, inSyncIds, routingTable);
346349
}
347350

348351
public synchronized DiscoveryNode getPrimaryNode() {

0 commit comments

Comments
 (0)