Skip to content

Avoid unnecessary persistence of retention leases #42299

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@

import com.carrotsearch.hppc.ObjectLongHashMap;
import com.carrotsearch.hppc.ObjectLongMap;

import org.elasticsearch.Version;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.support.replication.ReplicationResponse;
Expand Down Expand Up @@ -181,6 +180,18 @@ public class ReplicationTracker extends AbstractIndexShardComponent implements L
*/
private RetentionLeases retentionLeases = RetentionLeases.EMPTY;

/**
* The primary term of the most-recently persisted retention leases. This is used to check if we need to persist the current retention
* leases.
*/
private long persistedRetentionLeasesPrimaryTerm;

/**
* The version of the most-recently persisted retention leases. This is used to check if we need to persist the current retention
* leases.
*/
private long persistedRetentionLeasesVersion;

/**
* Get all retention leases tracked on this shard.
*
Expand Down Expand Up @@ -343,7 +354,8 @@ public RetentionLeases loadRetentionLeases(final Path path) throws IOException {
private final Object retentionLeasePersistenceLock = new Object();

/**
* Persists the current retention leases to their dedicated state file.
* Persists the current retention leases to their dedicated state file. If this version of the retention leases are already persisted
* then persistence is skipped.
*
* @param path the path to the directory containing the state file
* @throws WriteStateException if an exception occurs writing the state file
Expand All @@ -352,10 +364,16 @@ public void persistRetentionLeases(final Path path) throws WriteStateException {
synchronized (retentionLeasePersistenceLock) {
final RetentionLeases currentRetentionLeases;
synchronized (this) {
if (retentionLeases.supersedes(persistedRetentionLeasesPrimaryTerm, persistedRetentionLeasesVersion) == false) {
logger.trace("skipping persisting retention leases [{}], already persisted", retentionLeases);
return;
}
currentRetentionLeases = retentionLeases;
}
logger.trace("persisting retention leases [{}]", currentRetentionLeases);
RetentionLeases.FORMAT.writeAndCleanup(currentRetentionLeases, path);
persistedRetentionLeasesPrimaryTerm = currentRetentionLeases.primaryTerm();
persistedRetentionLeasesVersion = currentRetentionLeases.version();
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,13 +70,27 @@ public long version() {

/**
* Checks if this retention leases collection supersedes the specified retention leases collection. A retention leases collection
* supersedes another retention leases collection if its primary term is higher, or if for equal primary terms its version is higher
* supersedes another retention leases collection if its primary term is higher, or if for equal primary terms its version is higher.
*
* @param that the retention leases collection to test against
* @return true if this retention leases collection supercedes the specified retention lease collection, otherwise false
*/
public boolean supersedes(final RetentionLeases that) {
return primaryTerm > that.primaryTerm || primaryTerm == that.primaryTerm && version > that.version;
boolean supersedes(final RetentionLeases that) {
return supersedes(that.primaryTerm, that.version);
}

/**
* Checks if this retention leases collection would supersede a retention leases collection with the specified primary term and version.
* A retention leases collection supersedes another retention leases collection if its primary term is higher, or if for equal primary
* terms its version is higher.
*
* @param primaryTerm the primary term
* @param version the version
* @return true if this retention leases collection would supercedes a retention lease collection with the specified primary term and
* version
*/
boolean supersedes(final long primaryTerm, final long version) {
return this.primaryTerm > primaryTerm || this.primaryTerm == primaryTerm && this.version > version;
}

private final Map<String, RetentionLease> leases;
Expand Down Expand Up @@ -203,7 +217,7 @@ public static RetentionLeases fromXContent(final XContentParser parser) {
return PARSER.apply(parser, null);
}

static final MetaDataStateFormat<RetentionLeases> FORMAT = new MetaDataStateFormat<RetentionLeases>("retention-leases-") {
static final MetaDataStateFormat<RetentionLeases> FORMAT = new MetaDataStateFormat<>("retention-leases-") {

@Override
public void toXContent(final XContentBuilder builder, final RetentionLeases retentionLeases) throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
import org.elasticsearch.gateway.WriteStateException;
import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.index.shard.ShardId;
Expand Down Expand Up @@ -489,6 +490,48 @@ public void testLoadAndPersistRetentionLeases() throws IOException {
assertThat(replicationTracker.loadRetentionLeases(path), equalTo(replicationTracker.getRetentionLeases()));
}

public void testUnnecessaryPersistenceOfRetentionLeases() throws IOException {
final AllocationId allocationId = AllocationId.newInitializing();
long primaryTerm = randomLongBetween(1, Long.MAX_VALUE);
final ReplicationTracker replicationTracker = new ReplicationTracker(
new ShardId("test", "_na", 0),
allocationId.getId(),
IndexSettingsModule.newIndexSettings("test", Settings.EMPTY),
primaryTerm,
UNASSIGNED_SEQ_NO,
value -> {},
() -> 0L,
(leases, listener) -> {});
replicationTracker.updateFromMaster(
randomNonNegativeLong(),
Collections.singleton(allocationId.getId()),
routingTable(Collections.emptySet(), allocationId));
replicationTracker.activatePrimaryMode(SequenceNumbers.NO_OPS_PERFORMED);
final int length = randomIntBetween(0, 8);
for (int i = 0; i < length; i++) {
if (rarely() && primaryTerm < Long.MAX_VALUE) {
primaryTerm = randomLongBetween(primaryTerm + 1, Long.MAX_VALUE);
replicationTracker.setOperationPrimaryTerm(primaryTerm);
}
final long retainingSequenceNumber = randomLongBetween(SequenceNumbers.NO_OPS_PERFORMED, Long.MAX_VALUE);
replicationTracker.addRetentionLease(
Integer.toString(i), retainingSequenceNumber, "test-" + i, ActionListener.wrap(() -> {}));
}

final Path path = createTempDir();
replicationTracker.persistRetentionLeases(path);

final Tuple<RetentionLeases, Long> retentionLeasesWithGeneration =
RetentionLeases.FORMAT.loadLatestStateWithGeneration(logger, NamedXContentRegistry.EMPTY, path);

replicationTracker.persistRetentionLeases(path);
final Tuple<RetentionLeases, Long> retentionLeasesWithGenerationAfterUnnecessaryPersistence =
RetentionLeases.FORMAT.loadLatestStateWithGeneration(logger, NamedXContentRegistry.EMPTY, path);

assertThat(retentionLeasesWithGenerationAfterUnnecessaryPersistence.v1(), equalTo(retentionLeasesWithGeneration.v1()));
assertThat(retentionLeasesWithGenerationAfterUnnecessaryPersistence.v2(), equalTo(retentionLeasesWithGeneration.v2()));
}

/**
* Test that we correctly synchronize writing the retention lease state file in {@link ReplicationTracker#persistRetentionLeases(Path)}.
* This test can fail without the synchronization block in that method.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,9 @@ public void testSupersedesByPrimaryTerm() {
final long higherPrimaryTerm = randomLongBetween(lowerPrimaryTerm + 1, Long.MAX_VALUE);
final RetentionLeases right = new RetentionLeases(higherPrimaryTerm, randomLongBetween(1, Long.MAX_VALUE), Collections.emptyList());
assertTrue(right.supersedes(left));
assertTrue(right.supersedes(left.primaryTerm(), left.version()));
assertFalse(left.supersedes(right));
assertFalse(left.supersedes(right.primaryTerm(), right.version()));
}

public void testSupersedesByVersion() {
Expand All @@ -70,7 +72,9 @@ public void testSupersedesByVersion() {
final RetentionLeases left = new RetentionLeases(primaryTerm, lowerVersion, Collections.emptyList());
final RetentionLeases right = new RetentionLeases(primaryTerm, higherVersion, Collections.emptyList());
assertTrue(right.supersedes(left));
assertTrue(right.supersedes(left.primaryTerm(), left.version()));
assertFalse(left.supersedes(right));
assertFalse(left.supersedes(right.primaryTerm(), right.version()));
}

public void testRetentionLeasesRejectsDuplicates() {
Expand Down