Skip to content

Commit 1428a43

Browse files
committed
Avoid unnecessary persistence of retention leases (#42299)
Today we are persisting the retention leases at least every thirty seconds by a scheduled background sync. This sync causes an fsync to disk and when there are a large number of shards allocated to slow disks, these fsyncs can pile up and can severely impact the system. This commit addresses this by only persisting and fsyncing the retention leases if they have changed since the last time that we persisted and fsynced the retention leases.
1 parent 802e00a commit 1428a43

File tree

4 files changed

+85
-4
lines changed

4 files changed

+85
-4
lines changed

server/src/main/java/org/elasticsearch/index/seqno/ReplicationTracker.java

+20-1
Original file line numberDiff line numberDiff line change
@@ -180,6 +180,18 @@ public class ReplicationTracker extends AbstractIndexShardComponent implements L
180180
*/
181181
private RetentionLeases retentionLeases = RetentionLeases.EMPTY;
182182

183+
/**
184+
* The primary term of the most-recently persisted retention leases. This is used to check if we need to persist the current retention
185+
* leases.
186+
*/
187+
private long persistedRetentionLeasesPrimaryTerm;
188+
189+
/**
190+
* The version of the most-recently persisted retention leases. This is used to check if we need to persist the current retention
191+
* leases.
192+
*/
193+
private long persistedRetentionLeasesVersion;
194+
183195
/**
184196
* Get all retention leases tracked on this shard.
185197
*
@@ -342,7 +354,8 @@ public RetentionLeases loadRetentionLeases(final Path path) throws IOException {
342354
private final Object retentionLeasePersistenceLock = new Object();
343355

344356
/**
345-
* Persists the current retention leases to their dedicated state file.
357+
* Persists the current retention leases to their dedicated state file. If this version of the retention leases are already persisted
358+
* then persistence is skipped.
346359
*
347360
* @param path the path to the directory containing the state file
348361
* @throws WriteStateException if an exception occurs writing the state file
@@ -351,10 +364,16 @@ public void persistRetentionLeases(final Path path) throws WriteStateException {
351364
synchronized (retentionLeasePersistenceLock) {
352365
final RetentionLeases currentRetentionLeases;
353366
synchronized (this) {
367+
if (retentionLeases.supersedes(persistedRetentionLeasesPrimaryTerm, persistedRetentionLeasesVersion) == false) {
368+
logger.trace("skipping persisting retention leases [{}], already persisted", retentionLeases);
369+
return;
370+
}
354371
currentRetentionLeases = retentionLeases;
355372
}
356373
logger.trace("persisting retention leases [{}]", currentRetentionLeases);
357374
RetentionLeases.FORMAT.writeAndCleanup(currentRetentionLeases, path);
375+
persistedRetentionLeasesPrimaryTerm = currentRetentionLeases.primaryTerm();
376+
persistedRetentionLeasesVersion = currentRetentionLeases.version();
358377
}
359378
}
360379

server/src/main/java/org/elasticsearch/index/seqno/RetentionLeases.java

+17-3
Original file line numberDiff line numberDiff line change
@@ -69,13 +69,27 @@ public long version() {
6969

7070
/**
7171
* Checks if this retention leases collection supersedes the specified retention leases collection. A retention leases collection
72-
* supersedes another retention leases collection if its primary term is higher, or if for equal primary terms its version is higher
72+
* supersedes another retention leases collection if its primary term is higher, or if for equal primary terms its version is higher.
7373
*
7474
* @param that the retention leases collection to test against
7575
* @return true if this retention leases collection supercedes the specified retention lease collection, otherwise false
7676
*/
77-
public boolean supersedes(final RetentionLeases that) {
78-
return primaryTerm > that.primaryTerm || primaryTerm == that.primaryTerm && version > that.version;
77+
boolean supersedes(final RetentionLeases that) {
78+
return supersedes(that.primaryTerm, that.version);
79+
}
80+
81+
/**
82+
* Checks if this retention leases collection would supersede a retention leases collection with the specified primary term and version.
83+
* A retention leases collection supersedes another retention leases collection if its primary term is higher, or if for equal primary
84+
* terms its version is higher.
85+
*
86+
* @param primaryTerm the primary term
87+
* @param version the version
88+
* @return true if this retention leases collection would supercedes a retention lease collection with the specified primary term and
89+
* version
90+
*/
91+
boolean supersedes(final long primaryTerm, final long version) {
92+
return this.primaryTerm > primaryTerm || this.primaryTerm == primaryTerm && this.version > version;
7993
}
8094

8195
private final Map<String, RetentionLease> leases;

server/src/test/java/org/elasticsearch/index/seqno/ReplicationTrackerRetentionLeaseTests.java

+44
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import org.elasticsearch.common.collect.Tuple;
2525
import org.elasticsearch.common.settings.Settings;
2626
import org.elasticsearch.common.unit.TimeValue;
27+
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
2728
import org.elasticsearch.gateway.WriteStateException;
2829
import org.elasticsearch.index.IndexSettings;
2930
import org.elasticsearch.index.shard.ShardId;
@@ -499,6 +500,49 @@ public void testLoadAndPersistRetentionLeases() throws IOException {
499500
assertThat(replicationTracker.loadRetentionLeases(path), equalTo(replicationTracker.getRetentionLeases()));
500501
}
501502

503+
public void testUnnecessaryPersistenceOfRetentionLeases() throws IOException {
504+
final AllocationId allocationId = AllocationId.newInitializing();
505+
long primaryTerm = randomLongBetween(1, Long.MAX_VALUE);
506+
final ReplicationTracker replicationTracker = new ReplicationTracker(
507+
new ShardId("test", "_na", 0),
508+
allocationId.getId(),
509+
IndexSettingsModule.newIndexSettings("test", Settings.EMPTY),
510+
primaryTerm,
511+
UNASSIGNED_SEQ_NO,
512+
value -> {},
513+
() -> 0L,
514+
(leases, listener) -> {});
515+
replicationTracker.updateFromMaster(
516+
randomNonNegativeLong(),
517+
Collections.singleton(allocationId.getId()),
518+
routingTable(Collections.emptySet(), allocationId),
519+
Collections.emptySet());
520+
replicationTracker.activatePrimaryMode(SequenceNumbers.NO_OPS_PERFORMED);
521+
final int length = randomIntBetween(0, 8);
522+
for (int i = 0; i < length; i++) {
523+
if (rarely() && primaryTerm < Long.MAX_VALUE) {
524+
primaryTerm = randomLongBetween(primaryTerm + 1, Long.MAX_VALUE);
525+
replicationTracker.setOperationPrimaryTerm(primaryTerm);
526+
}
527+
final long retainingSequenceNumber = randomLongBetween(SequenceNumbers.NO_OPS_PERFORMED, Long.MAX_VALUE);
528+
replicationTracker.addRetentionLease(
529+
Integer.toString(i), retainingSequenceNumber, "test-" + i, ActionListener.wrap(() -> {}));
530+
}
531+
532+
final Path path = createTempDir();
533+
replicationTracker.persistRetentionLeases(path);
534+
535+
final Tuple<RetentionLeases, Long> retentionLeasesWithGeneration =
536+
RetentionLeases.FORMAT.loadLatestStateWithGeneration(logger, NamedXContentRegistry.EMPTY, path);
537+
538+
replicationTracker.persistRetentionLeases(path);
539+
final Tuple<RetentionLeases, Long> retentionLeasesWithGenerationAfterUnnecessaryPersistence =
540+
RetentionLeases.FORMAT.loadLatestStateWithGeneration(logger, NamedXContentRegistry.EMPTY, path);
541+
542+
assertThat(retentionLeasesWithGenerationAfterUnnecessaryPersistence.v1(), equalTo(retentionLeasesWithGeneration.v1()));
543+
assertThat(retentionLeasesWithGenerationAfterUnnecessaryPersistence.v2(), equalTo(retentionLeasesWithGeneration.v2()));
544+
}
545+
502546
/**
503547
* Test that we correctly synchronize writing the retention lease state file in {@link ReplicationTracker#persistRetentionLeases(Path)}.
504548
* This test can fail without the synchronization block in that method.

server/src/test/java/org/elasticsearch/index/seqno/RetentionLeasesTests.java

+4
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,9 @@ public void testSupersedesByPrimaryTerm() {
6060
final long higherPrimaryTerm = randomLongBetween(lowerPrimaryTerm + 1, Long.MAX_VALUE);
6161
final RetentionLeases right = new RetentionLeases(higherPrimaryTerm, randomLongBetween(1, Long.MAX_VALUE), Collections.emptyList());
6262
assertTrue(right.supersedes(left));
63+
assertTrue(right.supersedes(left.primaryTerm(), left.version()));
6364
assertFalse(left.supersedes(right));
65+
assertFalse(left.supersedes(right.primaryTerm(), right.version()));
6466
}
6567

6668
public void testSupersedesByVersion() {
@@ -70,7 +72,9 @@ public void testSupersedesByVersion() {
7072
final RetentionLeases left = new RetentionLeases(primaryTerm, lowerVersion, Collections.emptyList());
7173
final RetentionLeases right = new RetentionLeases(primaryTerm, higherVersion, Collections.emptyList());
7274
assertTrue(right.supersedes(left));
75+
assertTrue(right.supersedes(left.primaryTerm(), left.version()));
7376
assertFalse(left.supersedes(right));
77+
assertFalse(left.supersedes(right.primaryTerm(), right.version()));
7478
}
7579

7680
public void testRetentionLeasesRejectsDuplicates() {

0 commit comments

Comments
 (0)