Skip to content

Commit 8e95780

Browse files
authored
Soft-deletes policy should always fetch latest leases (#37940)
If a new retention lease is added while a primary's soft-deletes policy is locked for peer-recovery, that lease won't be baked into the Lucene commit. Relates #37165 Relates #37375
1 parent 68ed72b commit 8e95780

File tree

3 files changed

+33
-5
lines changed

3 files changed

+33
-5
lines changed

server/src/main/java/org/elasticsearch/index/engine/SoftDeletesPolicy.java

+5-3
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,6 @@ final class SoftDeletesPolicy {
4646
private long retentionOperations;
4747
// The min seq_no value that is retained - ops after this seq# should exist in the Lucene index.
4848
private long minRetainedSeqNo;
49-
private Collection<RetentionLease> retentionLeases;
5049
// provides the retention leases used to calculate the minimum sequence number to retain
5150
private final Supplier<Collection<RetentionLease>> retentionLeasesSupplier;
5251

@@ -59,7 +58,6 @@ final class SoftDeletesPolicy {
5958
this.retentionOperations = retentionOperations;
6059
this.minRetainedSeqNo = minRetainedSeqNo;
6160
this.retentionLeasesSupplier = Objects.requireNonNull(retentionLeasesSupplier);
62-
retentionLeases = retentionLeasesSupplier.get();
6361
this.localCheckpointOfSafeCommit = SequenceNumbers.NO_OPS_PERFORMED;
6462
this.retentionLockCount = 0;
6563
}
@@ -113,6 +111,11 @@ synchronized long getMinRetainedSeqNo() {
113111
}
114112

115113
public synchronized Tuple<Long, Collection<RetentionLease>> getRetentionPolicy() {
114+
/*
115+
* When an engine is flushed, we need to provide it the latest collection of retention leases even when the soft deletes policy is
116+
* locked for peer recovery.
117+
*/
118+
final Collection<RetentionLease> retentionLeases = retentionLeasesSupplier.get();
116119
// do not advance if the retention lock is held
117120
if (retentionLockCount == 0) {
118121
/*
@@ -126,7 +129,6 @@ public synchronized Tuple<Long, Collection<RetentionLease>> getRetentionPolicy()
126129
*/
127130

128131
// calculate the minimum sequence number to retain based on retention leases
129-
retentionLeases = retentionLeasesSupplier.get();
130132
final long minimumRetainingSequenceNumber = retentionLeases
131133
.stream()
132134
.mapToLong(RetentionLease::retainingSequenceNumber)

server/src/test/java/org/elasticsearch/index/engine/SoftDeletesPolicyTests.java

+24-2
Original file line numberDiff line numberDiff line change
@@ -24,18 +24,21 @@
2424
import org.apache.lucene.search.Query;
2525
import org.elasticsearch.common.lease.Releasable;
2626
import org.elasticsearch.index.seqno.RetentionLease;
27-
import org.elasticsearch.index.seqno.SequenceNumbers;
2827
import org.elasticsearch.test.ESTestCase;
2928

3029
import java.util.ArrayList;
3130
import java.util.Arrays;
3231
import java.util.Collection;
32+
import java.util.Collections;
3333
import java.util.HashSet;
3434
import java.util.List;
3535
import java.util.Set;
3636
import java.util.concurrent.atomic.AtomicLong;
3737
import java.util.function.Supplier;
3838

39+
import static org.elasticsearch.index.seqno.SequenceNumbers.NO_OPS_PERFORMED;
40+
import static org.hamcrest.Matchers.contains;
41+
import static org.hamcrest.Matchers.empty;
3942
import static org.hamcrest.Matchers.equalTo;
4043
import static org.hamcrest.Matchers.instanceOf;
4144

@@ -46,7 +49,7 @@ public class SoftDeletesPolicyTests extends ESTestCase {
4649
*/
4750
public void testSoftDeletesRetentionLock() {
4851
long retainedOps = between(0, 10000);
49-
AtomicLong globalCheckpoint = new AtomicLong(SequenceNumbers.NO_OPS_PERFORMED);
52+
AtomicLong globalCheckpoint = new AtomicLong(NO_OPS_PERFORMED);
5053
final AtomicLong[] retainingSequenceNumbers = new AtomicLong[randomIntBetween(0, 8)];
5154
for (int i = 0; i < retainingSequenceNumbers.length; i++) {
5255
retainingSequenceNumbers[i] = new AtomicLong();
@@ -116,4 +119,23 @@ public void testSoftDeletesRetentionLock() {
116119
assertThat(policy.getMinRetainedSeqNo(), equalTo(minRetainedSeqNo));
117120
}
118121

122+
public void testAlwaysFetchLatestRetentionLeases() {
123+
final AtomicLong globalCheckpoint = new AtomicLong(NO_OPS_PERFORMED);
124+
final Collection<RetentionLease> leases = new ArrayList<>();
125+
final int numLeases = randomIntBetween(0, 10);
126+
for (int i = 0; i < numLeases; i++) {
127+
leases.add(new RetentionLease(Integer.toString(i), randomLongBetween(0, 1000), randomNonNegativeLong(), "test"));
128+
}
129+
final Supplier<Collection<RetentionLease>> leasesSupplier = () -> Collections.unmodifiableCollection(new ArrayList<>(leases));
130+
final SoftDeletesPolicy policy =
131+
new SoftDeletesPolicy(globalCheckpoint::get, randomIntBetween(1, 1000), randomIntBetween(0, 1000), leasesSupplier);
132+
if (randomBoolean()) {
133+
policy.acquireRetentionLock();
134+
}
135+
if (numLeases == 0) {
136+
assertThat(policy.getRetentionPolicy().v2(), empty());
137+
} else {
138+
assertThat(policy.getRetentionPolicy().v2(), contains(leases.toArray(new RetentionLease[0])));
139+
}
140+
}
119141
}

server/src/test/java/org/elasticsearch/index/seqno/RetentionLeaseSyncIT.java

+4
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@
3232
import org.elasticsearch.test.ESIntegTestCase;
3333
import org.elasticsearch.threadpool.ThreadPool;
3434

35+
import java.io.Closeable;
3536
import java.util.Collection;
3637
import java.util.HashMap;
3738
import java.util.Map;
@@ -70,8 +71,11 @@ public void testRetentionLeasesSyncedOnAdd() throws Exception {
7071
final String source = randomAlphaOfLength(8);
7172
final CountDownLatch latch = new CountDownLatch(1);
7273
final ActionListener<ReplicationResponse> listener = ActionListener.wrap(r -> latch.countDown(), e -> fail(e.toString()));
74+
// simulate a peer-recovery which locks the soft-deletes policy on the primary.
75+
final Closeable retentionLock = randomBoolean() ? primary.acquireRetentionLockForPeerRecovery() : () -> {};
7376
currentRetentionLeases.put(id, primary.addRetentionLease(id, retainingSequenceNumber, source, listener));
7477
latch.await();
78+
retentionLock.close();
7579

7680
// check retention leases have been committed on the primary
7781
final Collection<RetentionLease> primaryCommittedRetentionLeases = RetentionLease.decodeRetentionLeases(

0 commit comments

Comments
 (0)