Skip to content

Commit d233bb0

Browse files
committed
Create missing PRRLs after primary activation
Today peer recovery retention leases (PRRLs) are created when starting a replication group from scratch and during peer recovery. However, if the replication group was migrated from nodes running a version which does not create PRRLs (e.g. 7.3 and earlier) then it's possible that the primary was relocated or promoted without first establishing all the expected leases. It's not possible to establish these leases before or during primary activation, so we must create them as soon as possible afterwards. This gives weaker guarantees about history retention, since there's a possibility that history will be discarded before it can be used. In practice such situations are expected to occur only rarely. This commit adds the machinery to create missing leases after primary activation, and strengthens the assertions about the existence of such leases in order to ensure that once all the leases do exist we never again enter a state where there's a missing lease. Relates #41536
1 parent 5dd6c68 commit d233bb0

File tree

4 files changed

+301
-16
lines changed

4 files changed

+301
-16
lines changed

qa/rolling-upgrade/src/test/java/org/elasticsearch/upgrades/RecoveryIT.java

+115
Original file line numberDiff line numberDiff line change
@@ -26,11 +26,16 @@
2626
import org.elasticsearch.client.ResponseException;
2727
import org.elasticsearch.cluster.metadata.IndexMetaData;
2828
import org.elasticsearch.cluster.metadata.MetaDataIndexStateService;
29+
import org.elasticsearch.cluster.routing.RecoverySource;
30+
import org.elasticsearch.cluster.routing.ShardRouting;
31+
import org.elasticsearch.cluster.routing.UnassignedInfo;
2932
import org.elasticsearch.common.Booleans;
3033
import org.elasticsearch.common.settings.Settings;
3134
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
3235
import org.elasticsearch.common.xcontent.support.XContentMapValues;
3336
import org.elasticsearch.index.IndexSettings;
37+
import org.elasticsearch.index.seqno.ReplicationTracker;
38+
import org.elasticsearch.index.shard.ShardId;
3439
import org.elasticsearch.rest.RestStatus;
3540
import org.elasticsearch.rest.action.document.RestIndexAction;
3641
import org.elasticsearch.rest.action.document.RestUpdateAction;
@@ -40,10 +45,12 @@
4045
import java.io.IOException;
4146
import java.util.ArrayList;
4247
import java.util.Collection;
48+
import java.util.HashSet;
4349
import java.util.List;
4450
import java.util.Locale;
4551
import java.util.Map;
4652
import java.util.Objects;
53+
import java.util.Set;
4754
import java.util.concurrent.Future;
4855
import java.util.concurrent.TimeUnit;
4956
import java.util.function.Predicate;
@@ -53,6 +60,7 @@
5360
import static org.elasticsearch.cluster.routing.allocation.decider.EnableAllocationDecider.INDEX_ROUTING_ALLOCATION_ENABLE_SETTING;
5461
import static org.elasticsearch.cluster.routing.allocation.decider.MaxRetryAllocationDecider.SETTING_ALLOCATION_MAX_RETRY;
5562
import static org.hamcrest.Matchers.equalTo;
63+
import static org.hamcrest.Matchers.hasItems;
5664
import static org.hamcrest.Matchers.hasSize;
5765
import static org.hamcrest.Matchers.is;
5866
import static org.hamcrest.Matchers.isIn;
@@ -382,6 +390,113 @@ public void testRecoveryWithSoftDeletes() throws Exception {
382390
ensureGreen(index);
383391
}
384392

393+
public void testRetentionLeasesEstablishedWhenPromotingPrimary() throws Exception {
394+
final String index = "recover_and_create_leases_in_promotion";
395+
if (CLUSTER_TYPE == ClusterType.OLD) {
396+
Settings.Builder settings = Settings.builder()
397+
.put(IndexMetaData.INDEX_NUMBER_OF_SHARDS_SETTING.getKey(), between(1, 5))
398+
.put(IndexMetaData.INDEX_NUMBER_OF_REPLICAS_SETTING.getKey(), between(1, 2)) // triggers nontrivial promotion
399+
.put(INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING.getKey(), "100ms")
400+
.put(SETTING_ALLOCATION_MAX_RETRY.getKey(), "0") // fail faster
401+
.put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), true);
402+
createIndex(index, settings.build());
403+
int numDocs = randomInt(10);
404+
indexDocs(index, 0, numDocs);
405+
if (randomBoolean()) {
406+
client().performRequest(new Request("POST", "/" + index + "/_flush"));
407+
}
408+
}
409+
ensureGreen(index);
410+
if (CLUSTER_TYPE == ClusterType.UPGRADED) {
411+
assertAllCopiesHaveRetentionLeases(index);
412+
}
413+
}
414+
415+
public void testRetentionLeasesEstablishedWhenRelocatingPrimary() throws Exception {
416+
final String index = "recover_and_create_leases_in_relocation";
417+
switch (CLUSTER_TYPE) {
418+
case OLD:
419+
Settings.Builder settings = Settings.builder()
420+
.put(IndexMetaData.INDEX_NUMBER_OF_SHARDS_SETTING.getKey(), between(1, 5))
421+
.put(IndexMetaData.INDEX_NUMBER_OF_REPLICAS_SETTING.getKey(), 1)
422+
.put(INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING.getKey(), "100ms")
423+
.put(SETTING_ALLOCATION_MAX_RETRY.getKey(), "0") // fail faster
424+
.put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), true);
425+
createIndex(index, settings.build());
426+
int numDocs = randomInt(10);
427+
indexDocs(index, 0, numDocs);
428+
if (randomBoolean()) {
429+
client().performRequest(new Request("POST", "/" + index + "/_flush"));
430+
}
431+
ensureGreen(index);
432+
break;
433+
434+
case MIXED:
435+
// trigger a primary relocation by excluding the last old node with a shard filter
436+
final Map<?, ?> nodesMap
437+
= ObjectPath.createFromResponse(client().performRequest(new Request("GET", "/_nodes"))).evaluate("nodes");
438+
final List<String> oldNodeNames = new ArrayList<>();
439+
for (Object nodeDetails : nodesMap.values()) {
440+
final Map<?, ?> nodeDetailsMap = (Map<?, ?>) nodeDetails;
441+
final String versionString = (String) nodeDetailsMap.get("version");
442+
if (versionString.equals(Version.CURRENT.toString()) == false) {
443+
oldNodeNames.add((String) nodeDetailsMap.get("name"));
444+
}
445+
}
446+
447+
if (oldNodeNames.size() == 1) {
448+
final String oldNodeName = oldNodeNames.get(0);
449+
logger.info("--> excluding index [{}] from node [{}]", index, oldNodeName);
450+
final Request putSettingsRequest = new Request("PUT", "/" + index + "/_settings");
451+
putSettingsRequest.setJsonEntity("{\"index.routing.allocation.exclude._name\":\"" + oldNodeName + "\"}");
452+
assertOK(client().performRequest(putSettingsRequest));
453+
ensureGreen(index);
454+
assertAllCopiesHaveRetentionLeases(index);
455+
} else {
456+
ensureGreen(index);
457+
}
458+
break;
459+
460+
case UPGRADED:
461+
ensureGreen(index);
462+
assertAllCopiesHaveRetentionLeases(index);
463+
break;
464+
}
465+
}
466+
467+
private void assertAllCopiesHaveRetentionLeases(String index) throws Exception {
468+
assertBusy(() -> {
469+
final Request statsRequest = new Request("GET", "/" + index + "/_stats");
470+
statsRequest.addParameter("level", "shards");
471+
final Map<?, ?> shardsStats = ObjectPath.createFromResponse(client().performRequest(statsRequest))
472+
.evaluate("indices." + index + ".shards");
473+
for (Map.Entry<?, ?> shardCopiesEntry : shardsStats.entrySet()) {
474+
final List<?> shardCopiesList = (List<?>) shardCopiesEntry.getValue();
475+
476+
final Set<String> expectedLeaseIds = new HashSet<>();
477+
for (Object shardCopyStats : shardCopiesList) {
478+
final String nodeId
479+
= Objects.requireNonNull((String) ((Map<?, ?>) (((Map<?, ?>) shardCopyStats).get("routing"))).get("node"));
480+
expectedLeaseIds.add(ReplicationTracker.getPeerRecoveryRetentionLeaseId(
481+
ShardRouting.newUnassigned(new ShardId("_na_", "test", 0), false, RecoverySource.PeerRecoverySource.INSTANCE,
482+
new UnassignedInfo(UnassignedInfo.Reason.INDEX_CREATED, "test")).initialize(nodeId, null, 0L)));
483+
}
484+
485+
final Set<String> actualLeaseIds = new HashSet<>();
486+
for (Object shardCopyStats : shardCopiesList) {
487+
final List<?> leases
488+
= (List<?>) ((Map<?, ?>) (((Map<?, ?>) shardCopyStats).get("retention_leases"))).get("leases");
489+
for (Object lease : leases) {
490+
actualLeaseIds.add(Objects.requireNonNull((String) (((Map<?, ?>) lease).get("id"))));
491+
}
492+
}
493+
assertThat("[" + index + "][" + shardCopiesEntry.getKey() + "] has leases " + actualLeaseIds
494+
+ " but expected " + expectedLeaseIds,
495+
actualLeaseIds, hasItems(expectedLeaseIds.toArray(new String[0])));
496+
}
497+
});
498+
}
499+
385500
/**
386501
* This test creates an index in the non upgraded cluster and closes it. It then checks that the index
387502
* is effectively closed and potentially replicated (if the version the index was created on supports

server/src/main/java/org/elasticsearch/index/seqno/ReplicationTracker.java

+92-16
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import com.carrotsearch.hppc.ObjectLongMap;
2424
import org.elasticsearch.Version;
2525
import org.elasticsearch.action.ActionListener;
26+
import org.elasticsearch.action.support.GroupedActionListener;
2627
import org.elasticsearch.action.support.replication.ReplicationResponse;
2728
import org.elasticsearch.cluster.metadata.IndexMetaData;
2829
import org.elasticsearch.cluster.routing.AllocationId;
@@ -201,6 +202,14 @@ public class ReplicationTracker extends AbstractIndexShardComponent implements L
201202
*/
202203
private long persistedRetentionLeasesVersion;
203204

205+
/**
206+
* Whether there should be a peer recovery retention lease (PRRL) for every tracked shard copy. Always true on indices created from
207+
* {@link Version#V_7_4_0} onwards, because these versions create PRRLs properly. May be false on indices created in an earlier version
208+
* if we recently did a rolling upgrade and {@link ReplicationTracker#createMissingPeerRecoveryRetentionLeases(ActionListener)} has not
209+
* yet completed. Is only permitted to change from false to true; can be removed once support for pre-PRRL indices is no longer needed.
210+
*/
211+
private boolean hasAllPeerRecoveryRetentionLeases;
212+
204213
/**
205214
* Get all retention leases tracked on this shard.
206215
*
@@ -486,10 +495,10 @@ public synchronized void renewPeerRecoveryRetentionLeases() {
486495
if (retentionLease == null) {
487496
/*
488497
* If this shard copy is tracked then we got here here via a rolling upgrade from an older version that doesn't
489-
* create peer recovery retention leases for every shard copy. TODO create leases lazily in that situation.
498+
* create peer recovery retention leases for every shard copy.
490499
*/
491500
assert checkpoints.get(shardRouting.allocationId().getId()).tracked == false
492-
|| indexSettings.getIndexVersionCreated().before(Version.V_7_4_0);
501+
|| hasAllPeerRecoveryRetentionLeases == false;
493502
return false;
494503
}
495504
return retentionLease.timestamp() <= renewalTimeMillis
@@ -753,7 +762,7 @@ private boolean invariant() {
753762
if (primaryMode
754763
&& indexSettings.isSoftDeleteEnabled()
755764
&& indexSettings.getIndexMetaData().getState() == IndexMetaData.State.OPEN
756-
&& indexSettings.getIndexVersionCreated().onOrAfter(Version.V_7_4_0)) {
765+
&& hasAllPeerRecoveryRetentionLeases) {
757766
// all tracked shard copies have a corresponding peer-recovery retention lease
758767
for (final ShardRouting shardRouting : routingTable.assignedShards()) {
759768
if (checkpoints.get(shardRouting.allocationId().getId()).tracked) {
@@ -820,6 +829,7 @@ public ReplicationTracker(
820829
this.pendingInSync = new HashSet<>();
821830
this.routingTable = null;
822831
this.replicationGroup = null;
832+
this.hasAllPeerRecoveryRetentionLeases = indexSettings.getIndexVersionCreated().onOrAfter(Version.V_7_4_0);
823833
assert Version.V_EMPTY.equals(indexSettings.getIndexVersionCreated()) == false;
824834
assert invariant();
825835
}
@@ -914,30 +924,51 @@ public synchronized void activatePrimaryMode(final long localCheckpoint) {
914924
updateGlobalCheckpointOnPrimary();
915925

916926
if (indexSettings.isSoftDeleteEnabled()) {
927+
addPeerRecoveryRetentionLeaseForSolePrimary();
928+
}
929+
930+
assert invariant();
931+
}
932+
933+
/**
934+
* Creates a peer recovery retention lease for this shard, if one does not already exist and this shard is the sole shard copy in the
935+
* replication group. If one does not already exist and yet there are other shard copies in this group then we must have just done
936+
* a rolling upgrade from a version before {@link Version#V_7_4_0}, in which case the missing leases should be created asynchronously
937+
* by the caller using {@link ReplicationTracker#createMissingPeerRecoveryRetentionLeases(ActionListener)}.
938+
*/
939+
private void addPeerRecoveryRetentionLeaseForSolePrimary() {
940+
assert primaryMode;
941+
assert Thread.holdsLock(this);
942+
943+
if (indexSettings().getIndexMetaData().getState() == IndexMetaData.State.OPEN) {
917944
final ShardRouting primaryShard = routingTable.primaryShard();
918945
final String leaseId = getPeerRecoveryRetentionLeaseId(primaryShard);
919946
if (retentionLeases.get(leaseId) == null) {
920-
/*
921-
* We might have got here here via a rolling upgrade from an older version that doesn't create peer recovery retention
922-
* leases for every shard copy, but in this case we do not expect any leases to exist.
923-
*/
924-
if (indexSettings.getIndexVersionCreated().onOrAfter(Version.V_7_4_0)) {
925-
// We are starting up the whole replication group from scratch: if we were not (i.e. this is a replica promotion) then
926-
// this copy must already be in-sync and active and therefore holds a retention lease for itself.
927-
assert routingTable.activeShards().equals(Collections.singletonList(primaryShard)) : routingTable.activeShards();
947+
if (replicationGroup.getReplicationTargets().equals(Collections.singletonList(primaryShard))) {
928948
assert primaryShard.allocationId().getId().equals(shardAllocationId)
929-
: routingTable.activeShards() + " vs " + shardAllocationId;
930-
assert replicationGroup.getReplicationTargets().equals(Collections.singletonList(primaryShard));
931-
949+
: routingTable.assignedShards() + " vs " + shardAllocationId;
932950
// Safe to call innerAddRetentionLease() without a subsequent sync since there are no other members of this replication
933951
// group.
952+
logger.trace("addPeerRecoveryRetentionLeaseForSolePrimary: adding lease [{}]", leaseId);
934953
innerAddRetentionLease(leaseId, Math.max(0L, checkpoints.get(shardAllocationId).globalCheckpoint + 1),
935954
PEER_RECOVERY_RETENTION_LEASE_SOURCE);
955+
hasAllPeerRecoveryRetentionLeases = true;
956+
} else {
957+
/*
958+
* We got here here via a rolling upgrade from an older version that doesn't create peer recovery retention
959+
* leases for every shard copy, but in this case we do not expect any leases to exist.
960+
*/
961+
assert hasAllPeerRecoveryRetentionLeases == false : routingTable + " vs " + retentionLeases;
962+
logger.debug("{} becoming primary of {} with missing lease: {}", primaryShard, routingTable, retentionLeases);
936963
}
964+
} else if (hasAllPeerRecoveryRetentionLeases == false && routingTable.assignedShards().stream().allMatch(shardRouting ->
965+
retentionLeases.contains(getPeerRecoveryRetentionLeaseId(shardRouting))
966+
|| checkpoints.get(shardRouting.allocationId().getId()).tracked == false)) {
967+
// Although this index is old enough not to have all the expected peer recovery retention leases, in fact it does, so we
968+
// don't need to do any more work.
969+
hasAllPeerRecoveryRetentionLeases = true;
937970
}
938971
}
939-
940-
assert invariant();
941972
}
942973

943974
/**
@@ -1240,9 +1271,54 @@ public synchronized void activateWithPrimaryContext(PrimaryContext primaryContex
12401271
// note that if there was no cluster state update between start of the engine of this shard and the call to
12411272
// initializeWithPrimaryContext, we might still have missed a cluster state update. This is best effort.
12421273
runAfter.run();
1274+
1275+
if (indexSettings.isSoftDeleteEnabled()) {
1276+
addPeerRecoveryRetentionLeaseForSolePrimary();
1277+
}
1278+
1279+
assert invariant();
1280+
}
1281+
1282+
private synchronized void setHasAllPeerRecoveryRetentionLeases() {
1283+
hasAllPeerRecoveryRetentionLeases = true;
12431284
assert invariant();
12441285
}
12451286

1287+
/**
1288+
* Create any required peer-recovery retention leases that do not currently exist because we just did a rolling upgrade from a version
1289+
* prior to {@link Version#V_7_4_0} that does not create peer-recovery retention leases.
1290+
*/
1291+
public synchronized void createMissingPeerRecoveryRetentionLeases(ActionListener<Void> listener) {
1292+
if (hasAllPeerRecoveryRetentionLeases == false) {
1293+
final List<ShardRouting> shardRoutings = routingTable.assignedShards();
1294+
final GroupedActionListener<ReplicationResponse> groupedActionListener = new GroupedActionListener<>(ActionListener.wrap(vs -> {
1295+
setHasAllPeerRecoveryRetentionLeases();
1296+
listener.onResponse(null);
1297+
}, listener::onFailure), shardRoutings.size());
1298+
for (ShardRouting shardRouting : shardRoutings) {
1299+
if (retentionLeases.contains(getPeerRecoveryRetentionLeaseId(shardRouting))) {
1300+
groupedActionListener.onResponse(null);
1301+
} else {
1302+
final CheckpointState checkpointState = checkpoints.get(shardRouting.allocationId().getId());
1303+
if (checkpointState.tracked == false) {
1304+
groupedActionListener.onResponse(null);
1305+
} else {
1306+
logger.trace("createMissingPeerRecoveryRetentionLeases: adding missing lease for {}", shardRouting);
1307+
try {
1308+
addPeerRecoveryRetentionLease(shardRouting.currentNodeId(),
1309+
Math.max(SequenceNumbers.NO_OPS_PERFORMED, checkpointState.globalCheckpoint), groupedActionListener);
1310+
} catch (Exception e) {
1311+
groupedActionListener.onFailure(e);
1312+
}
1313+
}
1314+
}
1315+
}
1316+
} else {
1317+
logger.trace("createMissingPeerRecoveryRetentionLeases: nothing to do");
1318+
listener.onResponse(null);
1319+
}
1320+
}
1321+
12461322
private Runnable getMasterUpdateOperationFromCurrentState() {
12471323
assert primaryMode == false;
12481324
final long lastAppliedClusterStateVersion = appliedClusterStateVersion;

0 commit comments

Comments
 (0)