Skip to content

Commit 4b19a4b

Browse files
authored
Create missing PRRLs after primary activation (#44009)
Today peer recovery retention leases (PRRLs) are created when starting a replication group from scratch and during peer recovery. However, if the replication group was migrated from nodes running a version which does not create PRRLs (e.g. 7.3 and earlier) then it's possible that the primary was relocated or promoted without first establishing all the expected leases. It's not possible to establish these leases before or during primary activation, so we must create them as soon as possible afterwards. This gives weaker guarantees about history retention, since there's a possibility that history will be discarded before it can be used. In practice such situations are expected to occur only rarely. This commit adds the machinery to create missing leases after primary activation, and strengthens the assertions about the existence of such leases in order to ensure that once all the leases do exist we never again enter a state where there's a missing lease. Relates #41536
1 parent 59a6830 commit 4b19a4b

File tree

6 files changed

+336
-17
lines changed

6 files changed

+336
-17
lines changed

qa/full-cluster-restart/src/test/java/org/elasticsearch/upgrades/FullClusterRestartIT.java

+24-1
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@
3535
import org.elasticsearch.common.xcontent.XContentBuilder;
3636
import org.elasticsearch.common.xcontent.json.JsonXContent;
3737
import org.elasticsearch.common.xcontent.support.XContentMapValues;
38+
import org.elasticsearch.index.seqno.RetentionLeaseUtils;
3839
import org.elasticsearch.rest.action.document.RestBulkAction;
3940
import org.elasticsearch.rest.action.document.RestGetAction;
4041
import org.elasticsearch.rest.action.document.RestIndexAction;
@@ -89,7 +90,7 @@ public class FullClusterRestartIT extends AbstractFullClusterRestartTestCase {
8990
private String type;
9091

9192
@Before
92-
public void setIndex() throws IOException {
93+
public void setIndex() {
9394
index = getTestName().toLowerCase(Locale.ROOT);
9495
}
9596

@@ -1338,4 +1339,26 @@ protected void ensureGreenLongWait(String index) throws IOException {
13381339
assertEquals("green", healthRsp.get("status"));
13391340
assertFalse((Boolean) healthRsp.get("timed_out"));
13401341
}
1342+
1343+
public void testPeerRecoveryRetentionLeases() throws IOException {
1344+
if (isRunningAgainstOldCluster()) {
1345+
XContentBuilder settings = jsonBuilder();
1346+
settings.startObject();
1347+
{
1348+
settings.startObject("settings");
1349+
settings.field("number_of_shards", between(1, 5));
1350+
settings.field("number_of_replicas", between(0, 2));
1351+
settings.endObject();
1352+
}
1353+
settings.endObject();
1354+
1355+
Request createIndex = new Request("PUT", "/" + index);
1356+
createIndex.setJsonEntity(Strings.toString(settings));
1357+
client().performRequest(createIndex);
1358+
ensureGreen(index);
1359+
} else {
1360+
ensureGreen(index);
1361+
RetentionLeaseUtils.assertAllCopiesHavePeerRecoveryRetentionLeases(client(), index);
1362+
}
1363+
}
13411364
}

qa/rolling-upgrade/src/test/java/org/elasticsearch/upgrades/RecoveryIT.java

+75
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
3232
import org.elasticsearch.common.xcontent.support.XContentMapValues;
3333
import org.elasticsearch.index.IndexSettings;
34+
import org.elasticsearch.index.seqno.RetentionLeaseUtils;
3435
import org.elasticsearch.rest.RestStatus;
3536
import org.elasticsearch.rest.action.document.RestIndexAction;
3637
import org.elasticsearch.rest.action.document.RestUpdateAction;
@@ -382,6 +383,80 @@ public void testRecoveryWithSoftDeletes() throws Exception {
382383
ensureGreen(index);
383384
}
384385

386+
public void testRetentionLeasesEstablishedWhenPromotingPrimary() throws Exception {
387+
final String index = "recover_and_create_leases_in_promotion";
388+
if (CLUSTER_TYPE == ClusterType.OLD) {
389+
Settings.Builder settings = Settings.builder()
390+
.put(IndexMetaData.INDEX_NUMBER_OF_SHARDS_SETTING.getKey(), between(1, 5))
391+
.put(IndexMetaData.INDEX_NUMBER_OF_REPLICAS_SETTING.getKey(), between(1, 2)) // triggers nontrivial promotion
392+
.put(INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING.getKey(), "100ms")
393+
.put(SETTING_ALLOCATION_MAX_RETRY.getKey(), "0") // fail faster
394+
.put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), true);
395+
createIndex(index, settings.build());
396+
int numDocs = randomInt(10);
397+
indexDocs(index, 0, numDocs);
398+
if (randomBoolean()) {
399+
client().performRequest(new Request("POST", "/" + index + "/_flush"));
400+
}
401+
}
402+
ensureGreen(index);
403+
if (CLUSTER_TYPE == ClusterType.UPGRADED) {
404+
assertBusy(() -> RetentionLeaseUtils.assertAllCopiesHavePeerRecoveryRetentionLeases(client(), index));
405+
}
406+
}
407+
408+
public void testRetentionLeasesEstablishedWhenRelocatingPrimary() throws Exception {
409+
final String index = "recover_and_create_leases_in_relocation";
410+
switch (CLUSTER_TYPE) {
411+
case OLD:
412+
Settings.Builder settings = Settings.builder()
413+
.put(IndexMetaData.INDEX_NUMBER_OF_SHARDS_SETTING.getKey(), between(1, 5))
414+
.put(IndexMetaData.INDEX_NUMBER_OF_REPLICAS_SETTING.getKey(), between(0, 1))
415+
.put(INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING.getKey(), "100ms")
416+
.put(SETTING_ALLOCATION_MAX_RETRY.getKey(), "0") // fail faster
417+
.put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), true);
418+
createIndex(index, settings.build());
419+
int numDocs = randomInt(10);
420+
indexDocs(index, 0, numDocs);
421+
if (randomBoolean()) {
422+
client().performRequest(new Request("POST", "/" + index + "/_flush"));
423+
}
424+
ensureGreen(index);
425+
break;
426+
427+
case MIXED:
428+
// trigger a primary relocation by excluding the last old node with a shard filter
429+
final Map<?, ?> nodesMap
430+
= ObjectPath.createFromResponse(client().performRequest(new Request("GET", "/_nodes"))).evaluate("nodes");
431+
final List<String> oldNodeNames = new ArrayList<>();
432+
for (Object nodeDetails : nodesMap.values()) {
433+
final Map<?, ?> nodeDetailsMap = (Map<?, ?>) nodeDetails;
434+
final String versionString = (String) nodeDetailsMap.get("version");
435+
if (versionString.equals(Version.CURRENT.toString()) == false) {
436+
oldNodeNames.add((String) nodeDetailsMap.get("name"));
437+
}
438+
}
439+
440+
if (oldNodeNames.size() == 1) {
441+
final String oldNodeName = oldNodeNames.get(0);
442+
logger.info("--> excluding index [{}] from node [{}]", index, oldNodeName);
443+
final Request putSettingsRequest = new Request("PUT", "/" + index + "/_settings");
444+
putSettingsRequest.setJsonEntity("{\"index.routing.allocation.exclude._name\":\"" + oldNodeName + "\"}");
445+
assertOK(client().performRequest(putSettingsRequest));
446+
ensureGreen(index);
447+
assertBusy(() -> RetentionLeaseUtils.assertAllCopiesHavePeerRecoveryRetentionLeases(client(), index));
448+
} else {
449+
ensureGreen(index);
450+
}
451+
break;
452+
453+
case UPGRADED:
454+
ensureGreen(index);
455+
assertBusy(() -> RetentionLeaseUtils.assertAllCopiesHavePeerRecoveryRetentionLeases(client(), index));
456+
break;
457+
}
458+
}
459+
385460
/**
386461
* This test creates an index in the non upgraded cluster and closes it. It then checks that the index
387462
* is effectively closed and potentially replicated (if the version the index was created on supports

server/src/main/java/org/elasticsearch/index/seqno/ReplicationTracker.java

+92-16
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import com.carrotsearch.hppc.ObjectLongMap;
2424
import org.elasticsearch.Version;
2525
import org.elasticsearch.action.ActionListener;
26+
import org.elasticsearch.action.support.GroupedActionListener;
2627
import org.elasticsearch.action.support.replication.ReplicationResponse;
2728
import org.elasticsearch.cluster.metadata.IndexMetaData;
2829
import org.elasticsearch.cluster.routing.AllocationId;
@@ -201,6 +202,14 @@ public class ReplicationTracker extends AbstractIndexShardComponent implements L
201202
*/
202203
private long persistedRetentionLeasesVersion;
203204

205+
/**
206+
* Whether there should be a peer recovery retention lease (PRRL) for every tracked shard copy. Always true on indices created from
207+
* {@link Version#V_7_4_0} onwards, because these versions create PRRLs properly. May be false on indices created in an earlier version
208+
* if we recently did a rolling upgrade and {@link ReplicationTracker#createMissingPeerRecoveryRetentionLeases(ActionListener)} has not
209+
* yet completed. Is only permitted to change from false to true; can be removed once support for pre-PRRL indices is no longer needed.
210+
*/
211+
private boolean hasAllPeerRecoveryRetentionLeases;
212+
204213
/**
205214
* Get all retention leases tracked on this shard.
206215
*
@@ -486,10 +495,10 @@ public synchronized void renewPeerRecoveryRetentionLeases() {
486495
if (retentionLease == null) {
487496
/*
488497
* If this shard copy is tracked then we got here here via a rolling upgrade from an older version that doesn't
489-
* create peer recovery retention leases for every shard copy. TODO create leases lazily in that situation.
498+
* create peer recovery retention leases for every shard copy.
490499
*/
491500
assert checkpoints.get(shardRouting.allocationId().getId()).tracked == false
492-
|| indexSettings.getIndexVersionCreated().before(Version.V_7_4_0);
501+
|| hasAllPeerRecoveryRetentionLeases == false;
493502
return false;
494503
}
495504
return retentionLease.timestamp() <= renewalTimeMillis
@@ -752,7 +761,7 @@ private boolean invariant() {
752761
if (primaryMode
753762
&& indexSettings.isSoftDeleteEnabled()
754763
&& indexSettings.getIndexMetaData().getState() == IndexMetaData.State.OPEN
755-
&& indexSettings.getIndexVersionCreated().onOrAfter(Version.V_7_4_0)) {
764+
&& hasAllPeerRecoveryRetentionLeases) {
756765
// all tracked shard copies have a corresponding peer-recovery retention lease
757766
for (final ShardRouting shardRouting : routingTable.assignedShards()) {
758767
if (checkpoints.get(shardRouting.allocationId().getId()).tracked) {
@@ -819,6 +828,7 @@ public ReplicationTracker(
819828
this.pendingInSync = new HashSet<>();
820829
this.routingTable = null;
821830
this.replicationGroup = null;
831+
this.hasAllPeerRecoveryRetentionLeases = indexSettings.getIndexVersionCreated().onOrAfter(Version.V_7_4_0);
822832
assert Version.V_EMPTY.equals(indexSettings.getIndexVersionCreated()) == false;
823833
assert invariant();
824834
}
@@ -913,30 +923,51 @@ public synchronized void activatePrimaryMode(final long localCheckpoint) {
913923
updateGlobalCheckpointOnPrimary();
914924

915925
if (indexSettings.isSoftDeleteEnabled()) {
926+
addPeerRecoveryRetentionLeaseForSolePrimary();
927+
}
928+
929+
assert invariant();
930+
}
931+
932+
/**
933+
* Creates a peer recovery retention lease for this shard, if one does not already exist and this shard is the sole shard copy in the
934+
* replication group. If one does not already exist and yet there are other shard copies in this group then we must have just done
935+
* a rolling upgrade from a version before {@link Version#V_7_4_0}, in which case the missing leases should be created asynchronously
936+
* by the caller using {@link ReplicationTracker#createMissingPeerRecoveryRetentionLeases(ActionListener)}.
937+
*/
938+
private void addPeerRecoveryRetentionLeaseForSolePrimary() {
939+
assert primaryMode;
940+
assert Thread.holdsLock(this);
941+
942+
if (indexSettings().getIndexMetaData().getState() == IndexMetaData.State.OPEN) {
916943
final ShardRouting primaryShard = routingTable.primaryShard();
917944
final String leaseId = getPeerRecoveryRetentionLeaseId(primaryShard);
918945
if (retentionLeases.get(leaseId) == null) {
919-
/*
920-
* We might have got here here via a rolling upgrade from an older version that doesn't create peer recovery retention
921-
* leases for every shard copy, but in this case we do not expect any leases to exist.
922-
*/
923-
if (indexSettings.getIndexVersionCreated().onOrAfter(Version.V_7_4_0)) {
924-
// We are starting up the whole replication group from scratch: if we were not (i.e. this is a replica promotion) then
925-
// this copy must already be in-sync and active and therefore holds a retention lease for itself.
926-
assert routingTable.activeShards().equals(Collections.singletonList(primaryShard)) : routingTable.activeShards();
946+
if (replicationGroup.getReplicationTargets().equals(Collections.singletonList(primaryShard))) {
927947
assert primaryShard.allocationId().getId().equals(shardAllocationId)
928-
: routingTable.activeShards() + " vs " + shardAllocationId;
929-
assert replicationGroup.getReplicationTargets().equals(Collections.singletonList(primaryShard));
930-
948+
: routingTable.assignedShards() + " vs " + shardAllocationId;
931949
// Safe to call innerAddRetentionLease() without a subsequent sync since there are no other members of this replication
932950
// group.
951+
logger.trace("addPeerRecoveryRetentionLeaseForSolePrimary: adding lease [{}]", leaseId);
933952
innerAddRetentionLease(leaseId, Math.max(0L, checkpoints.get(shardAllocationId).globalCheckpoint + 1),
934953
PEER_RECOVERY_RETENTION_LEASE_SOURCE);
954+
hasAllPeerRecoveryRetentionLeases = true;
955+
} else {
956+
/*
957+
* We got here here via a rolling upgrade from an older version that doesn't create peer recovery retention
958+
* leases for every shard copy, but in this case we do not expect any leases to exist.
959+
*/
960+
assert hasAllPeerRecoveryRetentionLeases == false : routingTable + " vs " + retentionLeases;
961+
logger.debug("{} becoming primary of {} with missing lease: {}", primaryShard, routingTable, retentionLeases);
935962
}
963+
} else if (hasAllPeerRecoveryRetentionLeases == false && routingTable.assignedShards().stream().allMatch(shardRouting ->
964+
retentionLeases.contains(getPeerRecoveryRetentionLeaseId(shardRouting))
965+
|| checkpoints.get(shardRouting.allocationId().getId()).tracked == false)) {
966+
// Although this index is old enough not to have all the expected peer recovery retention leases, in fact it does, so we
967+
// don't need to do any more work.
968+
hasAllPeerRecoveryRetentionLeases = true;
936969
}
937970
}
938-
939-
assert invariant();
940971
}
941972

942973
/**
@@ -1239,9 +1270,54 @@ public synchronized void activateWithPrimaryContext(PrimaryContext primaryContex
12391270
// note that if there was no cluster state update between start of the engine of this shard and the call to
12401271
// initializeWithPrimaryContext, we might still have missed a cluster state update. This is best effort.
12411272
runAfter.run();
1273+
1274+
if (indexSettings.isSoftDeleteEnabled()) {
1275+
addPeerRecoveryRetentionLeaseForSolePrimary();
1276+
}
1277+
1278+
assert invariant();
1279+
}
1280+
1281+
private synchronized void setHasAllPeerRecoveryRetentionLeases() {
1282+
hasAllPeerRecoveryRetentionLeases = true;
12421283
assert invariant();
12431284
}
12441285

1286+
/**
1287+
* Create any required peer-recovery retention leases that do not currently exist because we just did a rolling upgrade from a version
1288+
* prior to {@link Version#V_7_4_0} that does not create peer-recovery retention leases.
1289+
*/
1290+
public synchronized void createMissingPeerRecoveryRetentionLeases(ActionListener<Void> listener) {
1291+
if (hasAllPeerRecoveryRetentionLeases == false) {
1292+
final List<ShardRouting> shardRoutings = routingTable.assignedShards();
1293+
final GroupedActionListener<ReplicationResponse> groupedActionListener = new GroupedActionListener<>(ActionListener.wrap(vs -> {
1294+
setHasAllPeerRecoveryRetentionLeases();
1295+
listener.onResponse(null);
1296+
}, listener::onFailure), shardRoutings.size());
1297+
for (ShardRouting shardRouting : shardRoutings) {
1298+
if (retentionLeases.contains(getPeerRecoveryRetentionLeaseId(shardRouting))) {
1299+
groupedActionListener.onResponse(null);
1300+
} else {
1301+
final CheckpointState checkpointState = checkpoints.get(shardRouting.allocationId().getId());
1302+
if (checkpointState.tracked == false) {
1303+
groupedActionListener.onResponse(null);
1304+
} else {
1305+
logger.trace("createMissingPeerRecoveryRetentionLeases: adding missing lease for {}", shardRouting);
1306+
try {
1307+
addPeerRecoveryRetentionLease(shardRouting.currentNodeId(),
1308+
Math.max(SequenceNumbers.NO_OPS_PERFORMED, checkpointState.globalCheckpoint), groupedActionListener);
1309+
} catch (Exception e) {
1310+
groupedActionListener.onFailure(e);
1311+
}
1312+
}
1313+
}
1314+
}
1315+
} else {
1316+
logger.trace("createMissingPeerRecoveryRetentionLeases: nothing to do");
1317+
listener.onResponse(null);
1318+
}
1319+
}
1320+
12451321
private Runnable getMasterUpdateOperationFromCurrentState() {
12461322
assert primaryMode == false;
12471323
final long lastAppliedClusterStateVersion = appliedClusterStateVersion;

server/src/main/java/org/elasticsearch/index/shard/IndexShard.java

+9
Original file line numberDiff line numberDiff line change
@@ -486,6 +486,7 @@ public void updateShardState(final ShardRouting newRouting,
486486
if (currentRouting.initializing() && currentRouting.isRelocationTarget() == false && newRouting.active()) {
487487
// the master started a recovering primary, activate primary mode.
488488
replicationTracker.activatePrimaryMode(getLocalCheckpoint());
489+
ensurePeerRecoveryRetentionLeasesExist();
489490
}
490491
} else {
491492
assert currentRouting.primary() == false : "term is only increased as part of primary promotion";
@@ -528,6 +529,7 @@ public void updateShardState(final ShardRouting newRouting,
528529
assert getOperationPrimaryTerm() == newPrimaryTerm;
529530
try {
530531
replicationTracker.activatePrimaryMode(getLocalCheckpoint());
532+
ensurePeerRecoveryRetentionLeasesExist();
531533
/*
532534
* If this shard was serving as a replica shard when another shard was promoted to primary then
533535
* its Lucene index was reset during the primary term transition. In particular, the Lucene index
@@ -2275,6 +2277,13 @@ public void activateWithPrimaryContext(final ReplicationTracker.PrimaryContext p
22752277
synchronized (mutex) {
22762278
replicationTracker.activateWithPrimaryContext(primaryContext); // make changes to primaryMode flag only under mutex
22772279
}
2280+
ensurePeerRecoveryRetentionLeasesExist();
2281+
}
2282+
2283+
private void ensurePeerRecoveryRetentionLeasesExist() {
2284+
threadPool.generic().execute(() -> replicationTracker.createMissingPeerRecoveryRetentionLeases(ActionListener.wrap(
2285+
r -> logger.trace("created missing peer recovery retention leases"),
2286+
e -> logger.debug("failed creating missing peer recovery retention leases", e))));
22782287
}
22792288

22802289
/**

0 commit comments

Comments
 (0)