Skip to content

Commit 1391716

Browse files
authored
ReplicationTracker.markAllocationIdAsInSync may hang if allocation is cancelled (#30316)
At the end of recovery, we mark the recovering shard as "in sync" on the primary. From this point on the primary will treat any replication failure on it as critical and will reach out to the master to fail the shard. To do so, we wait for the local checkpoint of the recovered shard to be above the global checkpoint (in order to maintain global checkpoint invariant). If the master decides to cancel the allocation of the recovering shard while we wait, the method can currently hang and fail to return. It will also ignore the interrupts that are triggered by the cancelled recovery due to the primary closing. Note that this is crucial as this method is called while holding a primary permit. Since the method never comes back, the permit is never released. The unreleased permit will then block any primary relocation *and* while the primary is trying to relocate all indexing will be blocked for 30m as it waits to acquire the missing permit.
1 parent 0d7ac9a commit 1391716

File tree

2 files changed

+27
-7
lines changed

2 files changed

+27
-7
lines changed

server/src/main/java/org/elasticsearch/index/seqno/ReplicationTracker.java

+10
Original file line numberDiff line numberDiff line change
@@ -339,6 +339,11 @@ private boolean invariant() {
339339
"shard copy " + entry.getKey() + " is in-sync but not tracked";
340340
}
341341

342+
// all pending in sync shards are tracked
343+
for (String aId : pendingInSync) {
344+
assert checkpoints.get(aId) != null : "aId [" + aId + "] is pending in sync but isn't tracked";
345+
}
346+
342347
return true;
343348
}
344349

@@ -521,6 +526,9 @@ public synchronized void updateFromMaster(final long applyingClusterStateVersion
521526
checkpoints.put(initializingId, new CheckpointState(localCheckpoint, globalCheckpoint, inSync, inSync));
522527
}
523528
}
529+
if (removedEntries) {
530+
pendingInSync.removeIf(aId -> checkpoints.containsKey(aId) == false);
531+
}
524532
} else {
525533
for (String initializingId : initializingAllocationIds) {
526534
if (shardAllocationId.equals(initializingId) == false) {
@@ -549,6 +557,8 @@ public synchronized void updateFromMaster(final long applyingClusterStateVersion
549557
replicationGroup = calculateReplicationGroup();
550558
if (primaryMode && removedEntries) {
551559
updateGlobalCheckpointOnPrimary();
560+
// notify any waiter for local checkpoint advancement to recheck that their shard is still being tracked.
561+
notifyAllWaiters();
552562
}
553563
}
554564
assert invariant();

server/src/test/java/org/elasticsearch/index/seqno/ReplicationTrackerTests.java

+17-7
Original file line numberDiff line numberDiff line change
@@ -305,7 +305,8 @@ public void testWaitForAllocationIdToBeInSync() throws Exception {
305305
final AllocationId inSyncAllocationId = AllocationId.newInitializing();
306306
final AllocationId trackingAllocationId = AllocationId.newInitializing();
307307
final ReplicationTracker tracker = newTracker(inSyncAllocationId);
308-
tracker.updateFromMaster(randomNonNegativeLong(), Collections.singleton(inSyncAllocationId.getId()),
308+
final long clusterStateVersion = randomNonNegativeLong();
309+
tracker.updateFromMaster(clusterStateVersion, Collections.singleton(inSyncAllocationId.getId()),
309310
routingTable(Collections.singleton(trackingAllocationId), inSyncAllocationId), emptySet());
310311
tracker.activatePrimaryMode(globalCheckpoint);
311312
final Thread thread = new Thread(() -> {
@@ -336,13 +337,22 @@ public void testWaitForAllocationIdToBeInSync() throws Exception {
336337
assertBusy(() -> assertTrue(tracker.pendingInSync.contains(trackingAllocationId.getId())));
337338
}
338339

339-
tracker.updateLocalCheckpoint(trackingAllocationId.getId(), randomIntBetween(globalCheckpoint, 64));
340-
// synchronize with the waiting thread to mark that it is complete
341-
barrier.await();
342-
assertTrue(complete.get());
343-
assertTrue(tracker.getTrackedLocalCheckpointForShard(trackingAllocationId.getId()).inSync);
340+
if (randomBoolean()) {
341+
// normal path, shard catches up
342+
tracker.updateLocalCheckpoint(trackingAllocationId.getId(), randomIntBetween(globalCheckpoint, 64));
343+
// synchronize with the waiting thread to mark that it is complete
344+
barrier.await();
345+
assertTrue(complete.get());
346+
assertTrue(tracker.getTrackedLocalCheckpointForShard(trackingAllocationId.getId()).inSync);
347+
} else {
348+
// master changes its mind and cancels the allocation
349+
tracker.updateFromMaster(clusterStateVersion + 1, Collections.singleton(inSyncAllocationId.getId()),
350+
routingTable(emptySet(), inSyncAllocationId), emptySet());
351+
barrier.await();
352+
assertTrue(complete.get());
353+
assertNull(tracker.getTrackedLocalCheckpointForShard(trackingAllocationId.getId()));
354+
}
344355
assertFalse(tracker.pendingInSync.contains(trackingAllocationId.getId()));
345-
346356
thread.join();
347357
}
348358

0 commit comments

Comments
 (0)