Skip to content

Commit 32b70ed

Browse files
committed
Avoid unnecessary persistence of retention leases (#42299)
Today we are persisting the retention leases at least every thirty seconds by a scheduled background sync. This sync causes an fsync to disk and when there are a large number of shards allocated to slow disks, these fsyncs can pile up and can severely impact the system. This commit addresses this by only persisting and fsyncing the retention leases if they have changed since the last time that we persisted and fsynced the retention leases.
1 parent 7e4d3c6 commit 32b70ed

File tree

4 files changed

+85
-4
lines changed

4 files changed

+85
-4
lines changed

server/src/main/java/org/elasticsearch/index/seqno/ReplicationTracker.java

+20-1
Original file line numberDiff line numberDiff line change
@@ -180,6 +180,18 @@ public class ReplicationTracker extends AbstractIndexShardComponent implements L
180180
*/
181181
private RetentionLeases retentionLeases = RetentionLeases.EMPTY;
182182

183+
/**
184+
* The primary term of the most-recently persisted retention leases. This is used to check if we need to persist the current retention
185+
* leases.
186+
*/
187+
private long persistedRetentionLeasesPrimaryTerm;
188+
189+
/**
190+
* The version of the most-recently persisted retention leases. This is used to check if we need to persist the current retention
191+
* leases.
192+
*/
193+
private long persistedRetentionLeasesVersion;
194+
183195
/**
184196
* Get all retention leases tracked on this shard.
185197
*
@@ -342,7 +354,8 @@ public RetentionLeases loadRetentionLeases(final Path path) throws IOException {
342354
private final Object retentionLeasePersistenceLock = new Object();
343355

344356
/**
345-
* Persists the current retention leases to their dedicated state file.
357+
* Persists the current retention leases to their dedicated state file. If this version of the retention leases are already persisted
358+
* then persistence is skipped.
346359
*
347360
* @param path the path to the directory containing the state file
348361
* @throws WriteStateException if an exception occurs writing the state file
@@ -351,10 +364,16 @@ public void persistRetentionLeases(final Path path) throws WriteStateException {
351364
synchronized (retentionLeasePersistenceLock) {
352365
final RetentionLeases currentRetentionLeases;
353366
synchronized (this) {
367+
if (retentionLeases.supersedes(persistedRetentionLeasesPrimaryTerm, persistedRetentionLeasesVersion) == false) {
368+
logger.trace("skipping persisting retention leases [{}], already persisted", retentionLeases);
369+
return;
370+
}
354371
currentRetentionLeases = retentionLeases;
355372
}
356373
logger.trace("persisting retention leases [{}]", currentRetentionLeases);
357374
RetentionLeases.FORMAT.writeAndCleanup(currentRetentionLeases, path);
375+
persistedRetentionLeasesPrimaryTerm = currentRetentionLeases.primaryTerm();
376+
persistedRetentionLeasesVersion = currentRetentionLeases.version();
358377
}
359378
}
360379

server/src/main/java/org/elasticsearch/index/seqno/RetentionLeases.java

+17-3
Original file line numberDiff line numberDiff line change
@@ -70,13 +70,27 @@ public long version() {
7070

7171
/**
7272
* Checks if this retention leases collection supersedes the specified retention leases collection. A retention leases collection
73-
* supersedes another retention leases collection if its primary term is higher, or if for equal primary terms its version is higher
73+
* supersedes another retention leases collection if its primary term is higher, or if for equal primary terms its version is higher.
7474
*
7575
* @param that the retention leases collection to test against
7676
* @return true if this retention leases collection supercedes the specified retention lease collection, otherwise false
7777
*/
78-
public boolean supersedes(final RetentionLeases that) {
79-
return primaryTerm > that.primaryTerm || primaryTerm == that.primaryTerm && version > that.version;
78+
boolean supersedes(final RetentionLeases that) {
79+
return supersedes(that.primaryTerm, that.version);
80+
}
81+
82+
/**
83+
* Checks if this retention leases collection would supersede a retention leases collection with the specified primary term and version.
84+
* A retention leases collection supersedes another retention leases collection if its primary term is higher, or if for equal primary
85+
* terms its version is higher.
86+
*
87+
* @param primaryTerm the primary term
88+
* @param version the version
89+
* @return true if this retention leases collection would supercedes a retention lease collection with the specified primary term and
90+
* version
91+
*/
92+
boolean supersedes(final long primaryTerm, final long version) {
93+
return this.primaryTerm > primaryTerm || this.primaryTerm == primaryTerm && this.version > version;
8094
}
8195

8296
private final Map<String, RetentionLease> leases;

server/src/test/java/org/elasticsearch/index/seqno/ReplicationTrackerRetentionLeaseTests.java

+44
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import org.elasticsearch.common.collect.Tuple;
2525
import org.elasticsearch.common.settings.Settings;
2626
import org.elasticsearch.common.unit.TimeValue;
27+
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
2728
import org.elasticsearch.gateway.WriteStateException;
2829
import org.elasticsearch.index.IndexSettings;
2930
import org.elasticsearch.index.shard.ShardId;
@@ -499,6 +500,49 @@ public void testLoadAndPersistRetentionLeases() throws IOException {
499500
assertThat(replicationTracker.loadRetentionLeases(path), equalTo(replicationTracker.getRetentionLeases()));
500501
}
501502

503+
public void testUnnecessaryPersistenceOfRetentionLeases() throws IOException {
504+
final AllocationId allocationId = AllocationId.newInitializing();
505+
long primaryTerm = randomLongBetween(1, Long.MAX_VALUE);
506+
final ReplicationTracker replicationTracker = new ReplicationTracker(
507+
new ShardId("test", "_na", 0),
508+
allocationId.getId(),
509+
IndexSettingsModule.newIndexSettings("test", Settings.EMPTY),
510+
primaryTerm,
511+
UNASSIGNED_SEQ_NO,
512+
value -> {},
513+
() -> 0L,
514+
(leases, listener) -> {});
515+
replicationTracker.updateFromMaster(
516+
randomNonNegativeLong(),
517+
Collections.singleton(allocationId.getId()),
518+
routingTable(Collections.emptySet(), allocationId),
519+
Collections.emptySet());
520+
replicationTracker.activatePrimaryMode(SequenceNumbers.NO_OPS_PERFORMED);
521+
final int length = randomIntBetween(0, 8);
522+
for (int i = 0; i < length; i++) {
523+
if (rarely() && primaryTerm < Long.MAX_VALUE) {
524+
primaryTerm = randomLongBetween(primaryTerm + 1, Long.MAX_VALUE);
525+
replicationTracker.setOperationPrimaryTerm(primaryTerm);
526+
}
527+
final long retainingSequenceNumber = randomLongBetween(SequenceNumbers.NO_OPS_PERFORMED, Long.MAX_VALUE);
528+
replicationTracker.addRetentionLease(
529+
Integer.toString(i), retainingSequenceNumber, "test-" + i, ActionListener.wrap(() -> {}));
530+
}
531+
532+
final Path path = createTempDir();
533+
replicationTracker.persistRetentionLeases(path);
534+
535+
final Tuple<RetentionLeases, Long> retentionLeasesWithGeneration =
536+
RetentionLeases.FORMAT.loadLatestStateWithGeneration(logger, NamedXContentRegistry.EMPTY, path);
537+
538+
replicationTracker.persistRetentionLeases(path);
539+
final Tuple<RetentionLeases, Long> retentionLeasesWithGenerationAfterUnnecessaryPersistence =
540+
RetentionLeases.FORMAT.loadLatestStateWithGeneration(logger, NamedXContentRegistry.EMPTY, path);
541+
542+
assertThat(retentionLeasesWithGenerationAfterUnnecessaryPersistence.v1(), equalTo(retentionLeasesWithGeneration.v1()));
543+
assertThat(retentionLeasesWithGenerationAfterUnnecessaryPersistence.v2(), equalTo(retentionLeasesWithGeneration.v2()));
544+
}
545+
502546
/**
503547
* Test that we correctly synchronize writing the retention lease state file in {@link ReplicationTracker#persistRetentionLeases(Path)}.
504548
* This test can fail without the synchronization block in that method.

server/src/test/java/org/elasticsearch/index/seqno/RetentionLeasesTests.java

+4
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,9 @@ public void testSupersedesByPrimaryTerm() {
6060
final long higherPrimaryTerm = randomLongBetween(lowerPrimaryTerm + 1, Long.MAX_VALUE);
6161
final RetentionLeases right = new RetentionLeases(higherPrimaryTerm, randomLongBetween(1, Long.MAX_VALUE), Collections.emptyList());
6262
assertTrue(right.supersedes(left));
63+
assertTrue(right.supersedes(left.primaryTerm(), left.version()));
6364
assertFalse(left.supersedes(right));
65+
assertFalse(left.supersedes(right.primaryTerm(), right.version()));
6466
}
6567

6668
public void testSupersedesByVersion() {
@@ -70,7 +72,9 @@ public void testSupersedesByVersion() {
7072
final RetentionLeases left = new RetentionLeases(primaryTerm, lowerVersion, Collections.emptyList());
7173
final RetentionLeases right = new RetentionLeases(primaryTerm, higherVersion, Collections.emptyList());
7274
assertTrue(right.supersedes(left));
75+
assertTrue(right.supersedes(left.primaryTerm(), left.version()));
7376
assertFalse(left.supersedes(right));
77+
assertFalse(left.supersedes(right.primaryTerm(), right.version()));
7478
}
7579

7680
public void testRetentionLeasesRejectsDuplicates() {

0 commit comments

Comments
 (0)