Skip to content

Commit 5f9c8ba

Browse files
authored
Avoid unnecessary persistence of retention leases (#42299)
Today we are persisting the retention leases at least every thirty seconds by a scheduled background sync. This sync causes an fsync to disk and when there are a large number of shards allocated to slow disks, these fsyncs can pile up and can severely impact the system. This commit addresses this by only persisting and fsyncing the retention leases if they have changed since the last time that we persisted and fsynced the retention leases.
1 parent d150880 commit 5f9c8ba

File tree

4 files changed

+85
-6
lines changed

4 files changed

+85
-6
lines changed

server/src/main/java/org/elasticsearch/index/seqno/ReplicationTracker.java

+20-2
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,6 @@
2121

2222
import com.carrotsearch.hppc.ObjectLongHashMap;
2323
import com.carrotsearch.hppc.ObjectLongMap;
24-
2524
import org.elasticsearch.Version;
2625
import org.elasticsearch.action.ActionListener;
2726
import org.elasticsearch.action.support.replication.ReplicationResponse;
@@ -181,6 +180,18 @@ public class ReplicationTracker extends AbstractIndexShardComponent implements L
181180
*/
182181
private RetentionLeases retentionLeases = RetentionLeases.EMPTY;
183182

183+
/**
184+
* The primary term of the most-recently persisted retention leases. This is used to check if we need to persist the current retention
185+
* leases.
186+
*/
187+
private long persistedRetentionLeasesPrimaryTerm;
188+
189+
/**
190+
* The version of the most-recently persisted retention leases. This is used to check if we need to persist the current retention
191+
* leases.
192+
*/
193+
private long persistedRetentionLeasesVersion;
194+
184195
/**
185196
* Get all retention leases tracked on this shard.
186197
*
@@ -343,7 +354,8 @@ public RetentionLeases loadRetentionLeases(final Path path) throws IOException {
343354
private final Object retentionLeasePersistenceLock = new Object();
344355

345356
/**
346-
* Persists the current retention leases to their dedicated state file.
357+
* Persists the current retention leases to their dedicated state file. If this version of the retention leases are already persisted
358+
* then persistence is skipped.
347359
*
348360
* @param path the path to the directory containing the state file
349361
* @throws WriteStateException if an exception occurs writing the state file
@@ -352,10 +364,16 @@ public void persistRetentionLeases(final Path path) throws WriteStateException {
352364
synchronized (retentionLeasePersistenceLock) {
353365
final RetentionLeases currentRetentionLeases;
354366
synchronized (this) {
367+
if (retentionLeases.supersedes(persistedRetentionLeasesPrimaryTerm, persistedRetentionLeasesVersion) == false) {
368+
logger.trace("skipping persisting retention leases [{}], already persisted", retentionLeases);
369+
return;
370+
}
355371
currentRetentionLeases = retentionLeases;
356372
}
357373
logger.trace("persisting retention leases [{}]", currentRetentionLeases);
358374
RetentionLeases.FORMAT.writeAndCleanup(currentRetentionLeases, path);
375+
persistedRetentionLeasesPrimaryTerm = currentRetentionLeases.primaryTerm();
376+
persistedRetentionLeasesVersion = currentRetentionLeases.version();
359377
}
360378
}
361379

server/src/main/java/org/elasticsearch/index/seqno/RetentionLeases.java

+18-4
Original file line numberDiff line numberDiff line change
@@ -70,13 +70,27 @@ public long version() {
7070

7171
/**
7272
* Checks if this retention leases collection supersedes the specified retention leases collection. A retention leases collection
73-
* supersedes another retention leases collection if its primary term is higher, or if for equal primary terms its version is higher
73+
* supersedes another retention leases collection if its primary term is higher, or if for equal primary terms its version is higher.
7474
*
7575
* @param that the retention leases collection to test against
7676
* @return true if this retention leases collection supercedes the specified retention lease collection, otherwise false
7777
*/
78-
public boolean supersedes(final RetentionLeases that) {
79-
return primaryTerm > that.primaryTerm || primaryTerm == that.primaryTerm && version > that.version;
78+
boolean supersedes(final RetentionLeases that) {
79+
return supersedes(that.primaryTerm, that.version);
80+
}
81+
82+
/**
83+
* Checks if this retention leases collection would supersede a retention leases collection with the specified primary term and version.
84+
* A retention leases collection supersedes another retention leases collection if its primary term is higher, or if for equal primary
85+
* terms its version is higher.
86+
*
87+
* @param primaryTerm the primary term
88+
* @param version the version
89+
* @return true if this retention leases collection would supercedes a retention lease collection with the specified primary term and
90+
* version
91+
*/
92+
boolean supersedes(final long primaryTerm, final long version) {
93+
return this.primaryTerm > primaryTerm || this.primaryTerm == primaryTerm && this.version > version;
8094
}
8195

8296
private final Map<String, RetentionLease> leases;
@@ -203,7 +217,7 @@ public static RetentionLeases fromXContent(final XContentParser parser) {
203217
return PARSER.apply(parser, null);
204218
}
205219

206-
static final MetaDataStateFormat<RetentionLeases> FORMAT = new MetaDataStateFormat<RetentionLeases>("retention-leases-") {
220+
static final MetaDataStateFormat<RetentionLeases> FORMAT = new MetaDataStateFormat<>("retention-leases-") {
207221

208222
@Override
209223
public void toXContent(final XContentBuilder builder, final RetentionLeases retentionLeases) throws IOException {

server/src/test/java/org/elasticsearch/index/seqno/ReplicationTrackerRetentionLeaseTests.java

+43
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import org.elasticsearch.common.collect.Tuple;
2525
import org.elasticsearch.common.settings.Settings;
2626
import org.elasticsearch.common.unit.TimeValue;
27+
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
2728
import org.elasticsearch.gateway.WriteStateException;
2829
import org.elasticsearch.index.IndexSettings;
2930
import org.elasticsearch.index.shard.ShardId;
@@ -489,6 +490,48 @@ public void testLoadAndPersistRetentionLeases() throws IOException {
489490
assertThat(replicationTracker.loadRetentionLeases(path), equalTo(replicationTracker.getRetentionLeases()));
490491
}
491492

493+
public void testUnnecessaryPersistenceOfRetentionLeases() throws IOException {
494+
final AllocationId allocationId = AllocationId.newInitializing();
495+
long primaryTerm = randomLongBetween(1, Long.MAX_VALUE);
496+
final ReplicationTracker replicationTracker = new ReplicationTracker(
497+
new ShardId("test", "_na", 0),
498+
allocationId.getId(),
499+
IndexSettingsModule.newIndexSettings("test", Settings.EMPTY),
500+
primaryTerm,
501+
UNASSIGNED_SEQ_NO,
502+
value -> {},
503+
() -> 0L,
504+
(leases, listener) -> {});
505+
replicationTracker.updateFromMaster(
506+
randomNonNegativeLong(),
507+
Collections.singleton(allocationId.getId()),
508+
routingTable(Collections.emptySet(), allocationId));
509+
replicationTracker.activatePrimaryMode(SequenceNumbers.NO_OPS_PERFORMED);
510+
final int length = randomIntBetween(0, 8);
511+
for (int i = 0; i < length; i++) {
512+
if (rarely() && primaryTerm < Long.MAX_VALUE) {
513+
primaryTerm = randomLongBetween(primaryTerm + 1, Long.MAX_VALUE);
514+
replicationTracker.setOperationPrimaryTerm(primaryTerm);
515+
}
516+
final long retainingSequenceNumber = randomLongBetween(SequenceNumbers.NO_OPS_PERFORMED, Long.MAX_VALUE);
517+
replicationTracker.addRetentionLease(
518+
Integer.toString(i), retainingSequenceNumber, "test-" + i, ActionListener.wrap(() -> {}));
519+
}
520+
521+
final Path path = createTempDir();
522+
replicationTracker.persistRetentionLeases(path);
523+
524+
final Tuple<RetentionLeases, Long> retentionLeasesWithGeneration =
525+
RetentionLeases.FORMAT.loadLatestStateWithGeneration(logger, NamedXContentRegistry.EMPTY, path);
526+
527+
replicationTracker.persistRetentionLeases(path);
528+
final Tuple<RetentionLeases, Long> retentionLeasesWithGenerationAfterUnnecessaryPersistence =
529+
RetentionLeases.FORMAT.loadLatestStateWithGeneration(logger, NamedXContentRegistry.EMPTY, path);
530+
531+
assertThat(retentionLeasesWithGenerationAfterUnnecessaryPersistence.v1(), equalTo(retentionLeasesWithGeneration.v1()));
532+
assertThat(retentionLeasesWithGenerationAfterUnnecessaryPersistence.v2(), equalTo(retentionLeasesWithGeneration.v2()));
533+
}
534+
492535
/**
493536
* Test that we correctly synchronize writing the retention lease state file in {@link ReplicationTracker#persistRetentionLeases(Path)}.
494537
* This test can fail without the synchronization block in that method.

server/src/test/java/org/elasticsearch/index/seqno/RetentionLeasesTests.java

+4
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,9 @@ public void testSupersedesByPrimaryTerm() {
6060
final long higherPrimaryTerm = randomLongBetween(lowerPrimaryTerm + 1, Long.MAX_VALUE);
6161
final RetentionLeases right = new RetentionLeases(higherPrimaryTerm, randomLongBetween(1, Long.MAX_VALUE), Collections.emptyList());
6262
assertTrue(right.supersedes(left));
63+
assertTrue(right.supersedes(left.primaryTerm(), left.version()));
6364
assertFalse(left.supersedes(right));
65+
assertFalse(left.supersedes(right.primaryTerm(), right.version()));
6466
}
6567

6668
public void testSupersedesByVersion() {
@@ -70,7 +72,9 @@ public void testSupersedesByVersion() {
7072
final RetentionLeases left = new RetentionLeases(primaryTerm, lowerVersion, Collections.emptyList());
7173
final RetentionLeases right = new RetentionLeases(primaryTerm, higherVersion, Collections.emptyList());
7274
assertTrue(right.supersedes(left));
75+
assertTrue(right.supersedes(left.primaryTerm(), left.version()));
7376
assertFalse(left.supersedes(right));
77+
assertFalse(left.supersedes(right.primaryTerm(), right.version()));
7478
}
7579

7680
public void testRetentionLeasesRejectsDuplicates() {

0 commit comments

Comments
 (0)