Skip to content

Commit 58a7716

Browse files
authored
Enable removal of retention leases (elastic#38751)
This commit introduces the ability to remove retention leases. Explicit removal will be needed to manage retention leases used to increase the likelihood of operation-based recoveries syncing, and for consumers such as ILM.
1 parent 3fc1c94 commit 58a7716

File tree

5 files changed

+234
-7
lines changed

5 files changed

+234
-7
lines changed

server/src/main/java/org/elasticsearch/index/seqno/ReplicationTracker.java

Lines changed: 30 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -156,10 +156,10 @@ public class ReplicationTracker extends AbstractIndexShardComponent implements L
156156
private final LongSupplier currentTimeMillisSupplier;
157157

158158
/**
159-
* A callback when a new retention lease is created. In practice, this callback invokes the retention lease sync action, to sync
160-
* retention leases to replicas.
159+
* A callback when a new retention lease is created or an existing retention lease is removed. In practice, this callback invokes the
160+
* retention lease sync action, to sync retention leases to replicas.
161161
*/
162-
private final BiConsumer<RetentionLeases, ActionListener<ReplicationResponse>> onAddRetentionLease;
162+
private final BiConsumer<RetentionLeases, ActionListener<ReplicationResponse>> onSyncRetentionLeases;
163163

164164
/**
165165
* This set contains allocation IDs for which there is a thread actively waiting for the local checkpoint to advance to at least the
@@ -246,7 +246,7 @@ public RetentionLease addRetentionLease(
246246
Stream.concat(retentionLeases.leases().stream(), Stream.of(retentionLease)).collect(Collectors.toList()));
247247
currentRetentionLeases = retentionLeases;
248248
}
249-
onAddRetentionLease.accept(currentRetentionLeases, listener);
249+
onSyncRetentionLeases.accept(currentRetentionLeases, listener);
250250
return retentionLease;
251251
}
252252

@@ -283,6 +283,29 @@ public synchronized RetentionLease renewRetentionLease(final String id, final lo
283283
return retentionLease;
284284
}
285285

286+
/**
287+
* Removes an existing retention lease.
288+
*
289+
* @param id the identifier of the retention lease
290+
* @param listener the callback when the retention lease is successfully removed and synced to replicas
291+
*/
292+
public void removeRetentionLease(final String id, final ActionListener<ReplicationResponse> listener) {
293+
Objects.requireNonNull(listener);
294+
final RetentionLeases currentRetentionLeases;
295+
synchronized (this) {
296+
assert primaryMode;
297+
if (retentionLeases.contains(id) == false) {
298+
throw new IllegalArgumentException("retention lease with ID [" + id + "] does not exist");
299+
}
300+
retentionLeases = new RetentionLeases(
301+
operationPrimaryTerm,
302+
retentionLeases.version() + 1,
303+
retentionLeases.leases().stream().filter(lease -> lease.id().equals(id) == false).collect(Collectors.toList()));
304+
currentRetentionLeases = retentionLeases;
305+
}
306+
onSyncRetentionLeases.accept(currentRetentionLeases, listener);
307+
}
308+
286309
/**
287310
* Updates retention leases on a replica.
288311
*
@@ -563,7 +586,7 @@ private static long inSyncCheckpointStates(
563586
* @param indexSettings the index settings
564587
* @param operationPrimaryTerm the current primary term
565588
* @param globalCheckpoint the last known global checkpoint for this shard, or {@link SequenceNumbers#UNASSIGNED_SEQ_NO}
566-
* @param onAddRetentionLease a callback when a new retention lease is created or an existing retention lease expires
589+
* @param onSyncRetentionLeases a callback when a new retention lease is created or an existing retention lease expires
567590
*/
568591
public ReplicationTracker(
569592
final ShardId shardId,
@@ -573,7 +596,7 @@ public ReplicationTracker(
573596
final long globalCheckpoint,
574597
final LongConsumer onGlobalCheckpointUpdated,
575598
final LongSupplier currentTimeMillisSupplier,
576-
final BiConsumer<RetentionLeases, ActionListener<ReplicationResponse>> onAddRetentionLease) {
599+
final BiConsumer<RetentionLeases, ActionListener<ReplicationResponse>> onSyncRetentionLeases) {
577600
super(shardId, indexSettings);
578601
assert globalCheckpoint >= SequenceNumbers.UNASSIGNED_SEQ_NO : "illegal initial global checkpoint: " + globalCheckpoint;
579602
this.shardAllocationId = allocationId;
@@ -585,7 +608,7 @@ public ReplicationTracker(
585608
checkpoints.put(allocationId, new CheckpointState(SequenceNumbers.UNASSIGNED_SEQ_NO, globalCheckpoint, false, false));
586609
this.onGlobalCheckpointUpdated = Objects.requireNonNull(onGlobalCheckpointUpdated);
587610
this.currentTimeMillisSupplier = Objects.requireNonNull(currentTimeMillisSupplier);
588-
this.onAddRetentionLease = Objects.requireNonNull(onAddRetentionLease);
611+
this.onSyncRetentionLeases = Objects.requireNonNull(onSyncRetentionLeases);
589612
this.pendingInSync = new HashSet<>();
590613
this.routingTable = null;
591614
this.replicationGroup = null;

server/src/main/java/org/elasticsearch/index/shard/IndexShard.java

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1956,6 +1956,19 @@ public RetentionLease renewRetentionLease(final String id, final long retainingS
19561956
return replicationTracker.renewRetentionLease(id, retainingSequenceNumber, source);
19571957
}
19581958

1959+
/**
1960+
* Removes an existing retention lease.
1961+
*
1962+
* @param id the identifier of the retention lease
1963+
* @param listener the callback when the retention lease is successfully removed and synced to replicas
1964+
*/
1965+
public void removeRetentionLease(final String id, final ActionListener<ReplicationResponse> listener) {
1966+
Objects.requireNonNull(listener);
1967+
assert assertPrimaryMode();
1968+
verifyNotClosed();
1969+
replicationTracker.removeRetentionLease(id, listener);
1970+
}
1971+
19591972
/**
19601973
* Updates retention leases on a replica.
19611974
*

server/src/test/java/org/elasticsearch/index/seqno/ReplicationTrackerRetentionLeaseTests.java

Lines changed: 99 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -137,6 +137,105 @@ public void testAddRetentionLeaseCausesRetentionLeaseSync() {
137137
}
138138
}
139139

140+
public void testRemoveRetentionLease() {
141+
final AllocationId allocationId = AllocationId.newInitializing();
142+
long primaryTerm = randomLongBetween(1, Long.MAX_VALUE);
143+
final ReplicationTracker replicationTracker = new ReplicationTracker(
144+
new ShardId("test", "_na", 0),
145+
allocationId.getId(),
146+
IndexSettingsModule.newIndexSettings("test", Settings.EMPTY),
147+
primaryTerm,
148+
UNASSIGNED_SEQ_NO,
149+
value -> {},
150+
() -> 0L,
151+
(leases, listener) -> {});
152+
replicationTracker.updateFromMaster(
153+
randomNonNegativeLong(),
154+
Collections.singleton(allocationId.getId()),
155+
routingTable(Collections.emptySet(), allocationId),
156+
Collections.emptySet());
157+
replicationTracker.activatePrimaryMode(SequenceNumbers.NO_OPS_PERFORMED);
158+
final int length = randomIntBetween(0, 8);
159+
final long[] minimumRetainingSequenceNumbers = new long[length];
160+
for (int i = 0; i < length; i++) {
161+
if (rarely() && primaryTerm < Long.MAX_VALUE) {
162+
primaryTerm = randomLongBetween(primaryTerm + 1, Long.MAX_VALUE);
163+
replicationTracker.setOperationPrimaryTerm(primaryTerm);
164+
}
165+
minimumRetainingSequenceNumbers[i] = randomLongBetween(SequenceNumbers.NO_OPS_PERFORMED, Long.MAX_VALUE);
166+
replicationTracker.addRetentionLease(
167+
Integer.toString(i), minimumRetainingSequenceNumbers[i], "test-" + i, ActionListener.wrap(() -> {}));
168+
}
169+
170+
for (int i = 0; i < length; i++) {
171+
if (rarely() && primaryTerm < Long.MAX_VALUE) {
172+
primaryTerm = randomLongBetween(primaryTerm + 1, Long.MAX_VALUE);
173+
replicationTracker.setOperationPrimaryTerm(primaryTerm);
174+
}
175+
/*
176+
* Remove from the end since it will make the following assertion easier; we want to ensure that only the intended lease was
177+
* removed.
178+
*/
179+
replicationTracker.removeRetentionLease(Integer.toString(length - i - 1), ActionListener.wrap(() -> {}));
180+
assertRetentionLeases(
181+
replicationTracker,
182+
length - i - 1,
183+
minimumRetainingSequenceNumbers,
184+
primaryTerm,
185+
1 + length + i,
186+
true,
187+
false);
188+
}
189+
}
190+
191+
public void testRemoveRetentionLeaseCausesRetentionLeaseSync() {
192+
final AllocationId allocationId = AllocationId.newInitializing();
193+
final Map<String, Long> retainingSequenceNumbers = new HashMap<>();
194+
final AtomicBoolean invoked = new AtomicBoolean();
195+
final AtomicReference<ReplicationTracker> reference = new AtomicReference<>();
196+
final ReplicationTracker replicationTracker = new ReplicationTracker(
197+
new ShardId("test", "_na", 0),
198+
allocationId.getId(),
199+
IndexSettingsModule.newIndexSettings("test", Settings.EMPTY),
200+
randomNonNegativeLong(),
201+
UNASSIGNED_SEQ_NO,
202+
value -> {},
203+
() -> 0L,
204+
(leases, listener) -> {
205+
// we do not want to hold a lock on the replication tracker in the callback!
206+
assertFalse(Thread.holdsLock(reference.get()));
207+
invoked.set(true);
208+
assertThat(
209+
leases.leases()
210+
.stream()
211+
.collect(Collectors.toMap(RetentionLease::id, RetentionLease::retainingSequenceNumber)),
212+
equalTo(retainingSequenceNumbers));
213+
});
214+
reference.set(replicationTracker);
215+
replicationTracker.updateFromMaster(
216+
randomNonNegativeLong(),
217+
Collections.singleton(allocationId.getId()),
218+
routingTable(Collections.emptySet(), allocationId),
219+
Collections.emptySet());
220+
replicationTracker.activatePrimaryMode(SequenceNumbers.NO_OPS_PERFORMED);
221+
222+
final int length = randomIntBetween(0, 8);
223+
for (int i = 0; i < length; i++) {
224+
final String id = randomAlphaOfLength(8);
225+
final long retainingSequenceNumber = randomLongBetween(SequenceNumbers.NO_OPS_PERFORMED, Long.MAX_VALUE);
226+
retainingSequenceNumbers.put(id, retainingSequenceNumber);
227+
replicationTracker.addRetentionLease(id, retainingSequenceNumber, "test", ActionListener.wrap(() -> {}));
228+
// assert that the new retention lease callback was invoked
229+
assertTrue(invoked.get());
230+
231+
// reset the invocation marker so that we can assert the callback was not invoked when removing the lease
232+
invoked.set(false);
233+
retainingSequenceNumbers.remove(id);
234+
replicationTracker.removeRetentionLease(id, ActionListener.wrap(() -> {}));
235+
assertTrue(invoked.get());
236+
}
237+
}
238+
140239
public void testExpirationOnPrimary() {
141240
runExpirationTest(true);
142241
}

server/src/test/java/org/elasticsearch/index/seqno/RetentionLeaseIT.java

Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -126,6 +126,68 @@ public void testRetentionLeasesSyncedOnAdd() throws Exception {
126126
}
127127
}
128128

129+
public void testRetentionLeaseSyncedOnRemove() throws Exception {
130+
final int numberOfReplicas = 2 - scaledRandomIntBetween(0, 2);
131+
internalCluster().ensureAtLeastNumDataNodes(1 + numberOfReplicas);
132+
final Settings settings = Settings.builder()
133+
.put("index.number_of_shards", 1)
134+
.put("index.number_of_replicas", numberOfReplicas)
135+
.build();
136+
createIndex("index", settings);
137+
ensureGreen("index");
138+
final String primaryShardNodeId = clusterService().state().routingTable().index("index").shard(0).primaryShard().currentNodeId();
139+
final String primaryShardNodeName = clusterService().state().nodes().get(primaryShardNodeId).getName();
140+
final IndexShard primary = internalCluster()
141+
.getInstance(IndicesService.class, primaryShardNodeName)
142+
.getShardOrNull(new ShardId(resolveIndex("index"), 0));
143+
final int length = randomIntBetween(1, 8);
144+
final Map<String, RetentionLease> currentRetentionLeases = new HashMap<>();
145+
for (int i = 0; i < length; i++) {
146+
final String id = randomValueOtherThanMany(currentRetentionLeases.keySet()::contains, () -> randomAlphaOfLength(8));
147+
final long retainingSequenceNumber = randomLongBetween(0, Long.MAX_VALUE);
148+
final String source = randomAlphaOfLength(8);
149+
final CountDownLatch latch = new CountDownLatch(1);
150+
final ActionListener<ReplicationResponse> listener = ActionListener.wrap(r -> latch.countDown(), e -> fail(e.toString()));
151+
// simulate a peer recovery which locks the soft deletes policy on the primary
152+
final Closeable retentionLock = randomBoolean() ? primary.acquireRetentionLockForPeerRecovery() : () -> {};
153+
currentRetentionLeases.put(id, primary.addRetentionLease(id, retainingSequenceNumber, source, listener));
154+
latch.await();
155+
retentionLock.close();
156+
}
157+
158+
for (int i = 0; i < length; i++) {
159+
final String id = randomFrom(currentRetentionLeases.keySet());
160+
final CountDownLatch latch = new CountDownLatch(1);
161+
primary.removeRetentionLease(id, ActionListener.wrap(r -> latch.countDown(), e -> fail(e.toString())));
162+
// simulate a peer recovery which locks the soft deletes policy on the primary
163+
final Closeable retentionLock = randomBoolean() ? primary.acquireRetentionLockForPeerRecovery() : () -> {};
164+
currentRetentionLeases.remove(id);
165+
latch.await();
166+
retentionLock.close();
167+
168+
// check retention leases have been committed on the primary
169+
final RetentionLeases primaryCommittedRetentionLeases = RetentionLeases.decodeRetentionLeases(
170+
primary.commitStats().getUserData().get(Engine.RETENTION_LEASES));
171+
assertThat(currentRetentionLeases, equalTo(RetentionLeases.toMap(primaryCommittedRetentionLeases)));
172+
173+
// check current retention leases have been synced to all replicas
174+
for (final ShardRouting replicaShard : clusterService().state().routingTable().index("index").shard(0).replicaShards()) {
175+
final String replicaShardNodeId = replicaShard.currentNodeId();
176+
final String replicaShardNodeName = clusterService().state().nodes().get(replicaShardNodeId).getName();
177+
final IndexShard replica = internalCluster()
178+
.getInstance(IndicesService.class, replicaShardNodeName)
179+
.getShardOrNull(new ShardId(resolveIndex("index"), 0));
180+
final Map<String, RetentionLease> retentionLeasesOnReplica = RetentionLeases.toMap(replica.getRetentionLeases());
181+
assertThat(retentionLeasesOnReplica, equalTo(currentRetentionLeases));
182+
183+
// check retention leases have been committed on the replica
184+
final RetentionLeases replicaCommittedRetentionLeases = RetentionLeases.decodeRetentionLeases(
185+
replica.commitStats().getUserData().get(Engine.RETENTION_LEASES));
186+
assertThat(currentRetentionLeases, equalTo(RetentionLeases.toMap(replicaCommittedRetentionLeases)));
187+
}
188+
}
189+
}
190+
129191
public void testRetentionLeasesSyncOnExpiration() throws Exception {
130192
final int numberOfReplicas = 2 - scaledRandomIntBetween(0, 2);
131193
internalCluster().ensureAtLeastNumDataNodes(1 + numberOfReplicas);

server/src/test/java/org/elasticsearch/index/shard/IndexShardRetentionLeaseTests.java

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -114,6 +114,36 @@ public void testAddOrRenewRetentionLease() throws IOException {
114114
}
115115
}
116116

117+
public void testRemoveRetentionLease() throws IOException {
118+
final IndexShard indexShard = newStartedShard(true);
119+
final long primaryTerm = indexShard.getOperationPrimaryTerm();
120+
try {
121+
final int length = randomIntBetween(0, 8);
122+
final long[] minimumRetainingSequenceNumbers = new long[length];
123+
for (int i = 0; i < length; i++) {
124+
minimumRetainingSequenceNumbers[i] = randomLongBetween(SequenceNumbers.NO_OPS_PERFORMED, Long.MAX_VALUE);
125+
indexShard.addRetentionLease(
126+
Integer.toString(i), minimumRetainingSequenceNumbers[i], "test-" + i, ActionListener.wrap(() -> {}));
127+
assertRetentionLeases(
128+
indexShard, i + 1, minimumRetainingSequenceNumbers, primaryTerm, 1 + i, true, false);
129+
}
130+
131+
for (int i = 0; i < length; i++) {
132+
indexShard.removeRetentionLease(Integer.toString(length - i - 1), ActionListener.wrap(() -> {}));
133+
assertRetentionLeases(
134+
indexShard,
135+
length - i - 1,
136+
minimumRetainingSequenceNumbers,
137+
primaryTerm,
138+
1 + length + i,
139+
true,
140+
false);
141+
}
142+
} finally {
143+
closeShards(indexShard);
144+
}
145+
}
146+
117147
public void testExpirationOnPrimary() throws IOException {
118148
runExpirationTest(true);
119149
}

0 commit comments

Comments
 (0)