Skip to content

Commit eb71750

Browse files
committed
Add retention leases replication tests (#38857)
This commit introduces the retention leases to ESIndexLevelReplicationTestCase, then adds some tests verifying that the retention leases replication works correctly in spite of the presence of the primary failover or out of order delivery of retention leases sync requests. Relates #37165
1 parent d5711de commit eb71750

File tree

8 files changed

+272
-44
lines changed

8 files changed

+272
-44
lines changed

server/src/main/java/org/elasticsearch/index/shard/IndexShard.java

+4
Original file line numberDiff line numberDiff line change
@@ -3161,4 +3161,8 @@ public void advanceMaxSeqNoOfUpdatesOrDeletes(long seqNo) {
31613161
public void verifyShardBeforeIndexClosing() throws IllegalStateException {
31623162
getEngine().verifyEngineBeforeIndexClosing();
31633163
}
3164+
3165+
RetentionLeaseSyncer getRetentionLeaseSyncer() {
3166+
return retentionLeaseSyncer;
3167+
}
31643168
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,151 @@
1+
/*
2+
* Licensed to Elasticsearch under one or more contributor
3+
* license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright
5+
* ownership. Elasticsearch licenses this file to you under
6+
* the Apache License, Version 2.0 (the "License"); you may
7+
* not use this file except in compliance with the License.
8+
* You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.elasticsearch.index.replication;
21+
22+
import org.elasticsearch.action.ActionListener;
23+
import org.elasticsearch.action.support.PlainActionFuture;
24+
import org.elasticsearch.action.support.replication.ReplicationResponse;
25+
import org.elasticsearch.cluster.metadata.IndexMetaData;
26+
import org.elasticsearch.common.Randomness;
27+
import org.elasticsearch.common.settings.Settings;
28+
import org.elasticsearch.index.IndexSettings;
29+
import org.elasticsearch.index.seqno.RetentionLease;
30+
import org.elasticsearch.index.seqno.RetentionLeaseSyncAction;
31+
import org.elasticsearch.index.seqno.RetentionLeases;
32+
import org.elasticsearch.index.shard.IndexShard;
33+
import org.elasticsearch.index.shard.ShardId;
34+
35+
import java.util.ArrayList;
36+
import java.util.List;
37+
import java.util.concurrent.CountDownLatch;
38+
39+
import static org.hamcrest.Matchers.containsInAnyOrder;
40+
import static org.hamcrest.Matchers.equalTo;
41+
import static org.hamcrest.Matchers.hasSize;
42+
43+
public class RetentionLeasesReplicationTests extends ESIndexLevelReplicationTestCase {
44+
45+
public void testSimpleSyncRetentionLeases() throws Exception {
46+
Settings settings = Settings.builder().put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), true).build();
47+
try (ReplicationGroup group = createGroup(between(0, 2), settings)) {
48+
group.startAll();
49+
List<RetentionLease> leases = new ArrayList<>();
50+
int iterations = between(1, 100);
51+
CountDownLatch latch = new CountDownLatch(iterations);
52+
for (int i = 0; i < iterations; i++) {
53+
if (leases.isEmpty() == false && rarely()) {
54+
RetentionLease leaseToRemove = randomFrom(leases);
55+
leases.remove(leaseToRemove);
56+
group.removeRetentionLease(leaseToRemove.id(), ActionListener.wrap(latch::countDown));
57+
} else {
58+
RetentionLease newLease = group.addRetentionLease(Integer.toString(i), randomNonNegativeLong(), "test-" + i,
59+
ActionListener.wrap(latch::countDown));
60+
leases.add(newLease);
61+
}
62+
}
63+
RetentionLeases leasesOnPrimary = group.getPrimary().getRetentionLeases();
64+
assertThat(leasesOnPrimary.version(), equalTo((long) iterations));
65+
assertThat(leasesOnPrimary.primaryTerm(), equalTo(group.getPrimary().getOperationPrimaryTerm()));
66+
assertThat(leasesOnPrimary.leases(), containsInAnyOrder(leases.toArray(new RetentionLease[0])));
67+
latch.await();
68+
for (IndexShard replica : group.getReplicas()) {
69+
assertThat(replica.getRetentionLeases(), equalTo(leasesOnPrimary));
70+
}
71+
}
72+
}
73+
74+
public void testOutOfOrderRetentionLeasesRequests() throws Exception {
75+
Settings settings = Settings.builder().put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), true).build();
76+
int numberOfReplicas = between(1, 2);
77+
IndexMetaData indexMetaData = buildIndexMetaData(numberOfReplicas, settings, indexMapping);
78+
try (ReplicationGroup group = new ReplicationGroup(indexMetaData) {
79+
@Override
80+
protected void syncRetentionLeases(ShardId shardId, RetentionLeases leases, ActionListener<ReplicationResponse> listener) {
81+
listener.onResponse(new SyncRetentionLeasesResponse(new RetentionLeaseSyncAction.Request(shardId, leases)));
82+
}
83+
}) {
84+
group.startAll();
85+
int numLeases = between(1, 10);
86+
List<RetentionLeaseSyncAction.Request> requests = new ArrayList<>();
87+
for (int i = 0; i < numLeases; i++) {
88+
PlainActionFuture<ReplicationResponse> future = new PlainActionFuture<>();
89+
group.addRetentionLease(Integer.toString(i), randomNonNegativeLong(), "test-" + i, future);
90+
requests.add(((SyncRetentionLeasesResponse) future.actionGet()).syncRequest);
91+
}
92+
RetentionLeases leasesOnPrimary = group.getPrimary().getRetentionLeases();
93+
for (IndexShard replica : group.getReplicas()) {
94+
Randomness.shuffle(requests);
95+
requests.forEach(request -> group.executeRetentionLeasesSyncRequestOnReplica(request, replica));
96+
assertThat(replica.getRetentionLeases(), equalTo(leasesOnPrimary));
97+
}
98+
}
99+
}
100+
101+
public void testSyncRetentionLeasesWithPrimaryPromotion() throws Exception {
102+
Settings settings = Settings.builder().put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), true).build();
103+
int numberOfReplicas = between(2, 4);
104+
IndexMetaData indexMetaData = buildIndexMetaData(numberOfReplicas, settings, indexMapping);
105+
try (ReplicationGroup group = new ReplicationGroup(indexMetaData) {
106+
@Override
107+
protected void syncRetentionLeases(ShardId shardId, RetentionLeases leases, ActionListener<ReplicationResponse> listener) {
108+
listener.onResponse(new SyncRetentionLeasesResponse(new RetentionLeaseSyncAction.Request(shardId, leases)));
109+
}
110+
}) {
111+
group.startAll();
112+
int numLeases = between(1, 100);
113+
IndexShard newPrimary = randomFrom(group.getReplicas());
114+
RetentionLeases latestRetentionLeasesOnNewPrimary = RetentionLeases.EMPTY;
115+
for (int i = 0; i < numLeases; i++) {
116+
PlainActionFuture<ReplicationResponse> addLeaseFuture = new PlainActionFuture<>();
117+
group.addRetentionLease(Integer.toString(i), randomNonNegativeLong(), "test-" + i, addLeaseFuture);
118+
RetentionLeaseSyncAction.Request request = ((SyncRetentionLeasesResponse) addLeaseFuture.actionGet()).syncRequest;
119+
for (IndexShard replica : randomSubsetOf(group.getReplicas())) {
120+
group.executeRetentionLeasesSyncRequestOnReplica(request, replica);
121+
if (newPrimary == replica) {
122+
latestRetentionLeasesOnNewPrimary = request.getRetentionLeases();
123+
}
124+
}
125+
}
126+
group.promoteReplicaToPrimary(newPrimary).get();
127+
// we need to make changes to retention leases to sync it to replicas
128+
// since we don't sync retention leases when promoting a new primary.
129+
PlainActionFuture<ReplicationResponse> newLeaseFuture = new PlainActionFuture<>();
130+
group.addRetentionLease("new-lease-after-promotion", randomNonNegativeLong(), "test", newLeaseFuture);
131+
RetentionLeases leasesOnPrimary = group.getPrimary().getRetentionLeases();
132+
assertThat(leasesOnPrimary.primaryTerm(), equalTo(group.getPrimary().getOperationPrimaryTerm()));
133+
assertThat(leasesOnPrimary.version(), equalTo(latestRetentionLeasesOnNewPrimary.version() + 1L));
134+
assertThat(leasesOnPrimary.leases(), hasSize(latestRetentionLeasesOnNewPrimary.leases().size() + 1));
135+
RetentionLeaseSyncAction.Request request = ((SyncRetentionLeasesResponse) newLeaseFuture.actionGet()).syncRequest;
136+
for (IndexShard replica : group.getReplicas()) {
137+
group.executeRetentionLeasesSyncRequestOnReplica(request, replica);
138+
}
139+
for (IndexShard replica : group.getReplicas()) {
140+
assertThat(replica.getRetentionLeases(), equalTo(leasesOnPrimary));
141+
}
142+
}
143+
}
144+
145+
static final class SyncRetentionLeasesResponse extends ReplicationResponse {
146+
final RetentionLeaseSyncAction.Request syncRequest;
147+
SyncRetentionLeasesResponse(RetentionLeaseSyncAction.Request syncRequest) {
148+
this.syncRequest = syncRequest;
149+
}
150+
}
151+
}

server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java

+24-26
Original file line numberDiff line numberDiff line change
@@ -101,6 +101,7 @@
101101
import org.elasticsearch.index.mapper.SourceToParse;
102102
import org.elasticsearch.index.mapper.Uid;
103103
import org.elasticsearch.index.mapper.VersionFieldMapper;
104+
import org.elasticsearch.index.seqno.RetentionLeaseSyncer;
104105
import org.elasticsearch.index.seqno.RetentionLeases;
105106
import org.elasticsearch.index.seqno.SeqNoStats;
106107
import org.elasticsearch.index.seqno.SequenceNumbers;
@@ -1046,8 +1047,8 @@ public void testGlobalCheckpointSync() throws IOException {
10461047
final IndexMetaData.Builder indexMetadata =
10471048
IndexMetaData.builder(shardRouting.getIndexName()).settings(settings).primaryTerm(0, 1);
10481049
final AtomicBoolean synced = new AtomicBoolean();
1049-
final IndexShard primaryShard =
1050-
newShard(shardRouting, indexMetadata.build(), null, new InternalEngineFactory(), () -> synced.set(true));
1050+
final IndexShard primaryShard = newShard(
1051+
shardRouting, indexMetadata.build(), null, new InternalEngineFactory(), () -> synced.set(true), RetentionLeaseSyncer.EMPTY);
10511052
// add a replica
10521053
recoverShardFromStore(primaryShard);
10531054
final IndexShard replicaShard = newShard(shardId, false);
@@ -1462,9 +1463,8 @@ public String[] listAll() throws IOException {
14621463
};
14631464

14641465
try (Store store = createStore(shardId, new IndexSettings(metaData, Settings.EMPTY), directory)) {
1465-
IndexShard shard = newShard(shardRouting, shardPath, metaData, i -> store,
1466-
null, new InternalEngineFactory(), () -> {
1467-
}, EMPTY_EVENT_LISTENER);
1466+
IndexShard shard = newShard(shardRouting, shardPath, metaData, i -> store, null, new InternalEngineFactory(),
1467+
() -> { }, RetentionLeaseSyncer.EMPTY, EMPTY_EVENT_LISTENER);
14681468
AtomicBoolean failureCallbackTriggered = new AtomicBoolean(false);
14691469
shard.addShardFailureCallback((ig)->failureCallbackTriggered.set(true));
14701470

@@ -2122,6 +2122,7 @@ public void testRecoverFromStoreRemoveStaleOperations() throws Exception {
21222122
null,
21232123
shard.getEngineFactory(),
21242124
shard.getGlobalCheckpointSyncer(),
2125+
shard.getRetentionLeaseSyncer(),
21252126
EMPTY_EVENT_LISTENER);
21262127
DiscoveryNode localNode = new DiscoveryNode("foo", buildNewFakeTransportAddress(), emptyMap(), emptySet(), Version.CURRENT);
21272128
newShard.markAsRecovering("store", new RecoveryState(newShard.routingEntry(), localNode, null));
@@ -2242,6 +2243,7 @@ public IndexSearcher wrap(IndexSearcher searcher) throws EngineException {
22422243
wrapper,
22432244
new InternalEngineFactory(),
22442245
() -> {},
2246+
RetentionLeaseSyncer.EMPTY,
22452247
EMPTY_EVENT_LISTENER);
22462248

22472249
recoverShardFromStore(newShard);
@@ -2396,6 +2398,7 @@ public IndexSearcher wrap(IndexSearcher searcher) throws EngineException {
23962398
wrapper,
23972399
new InternalEngineFactory(),
23982400
() -> {},
2401+
RetentionLeaseSyncer.EMPTY,
23992402
EMPTY_EVENT_LISTENER);
24002403

24012404
recoverShardFromStore(newShard);
@@ -2962,9 +2965,8 @@ public FileVisitResult visitFile(Path file, BasicFileAttributes attrs) throws IO
29622965
.put(IndexSettings.INDEX_CHECK_ON_STARTUP.getKey(), randomFrom("true", "checksum")))
29632966
.build();
29642967

2965-
IndexShard corruptedShard = newShard(shardRouting, shardPath, indexMetaData,
2966-
null, null, indexShard.engineFactory,
2967-
indexShard.getGlobalCheckpointSyncer(), EMPTY_EVENT_LISTENER);
2968+
IndexShard corruptedShard = newShard(shardRouting, shardPath, indexMetaData, null, null, indexShard.engineFactory,
2969+
indexShard.getGlobalCheckpointSyncer(), indexShard.getRetentionLeaseSyncer(), EMPTY_EVENT_LISTENER);
29682970

29692971
final IndexShardRecoveryException indexShardRecoveryException =
29702972
expectThrows(IndexShardRecoveryException.class, () -> newStartedShard(p -> corruptedShard, true));
@@ -3007,9 +3009,8 @@ public void testShardDoesNotStartIfCorruptedMarkerIsPresent() throws Exception {
30073009
}
30083010

30093011
// try to start shard on corrupted files
3010-
final IndexShard corruptedShard = newShard(shardRouting, shardPath, indexMetaData,
3011-
null, null, indexShard.engineFactory,
3012-
indexShard.getGlobalCheckpointSyncer(), EMPTY_EVENT_LISTENER);
3012+
final IndexShard corruptedShard = newShard(shardRouting, shardPath, indexMetaData, null, null, indexShard.engineFactory,
3013+
indexShard.getGlobalCheckpointSyncer(), indexShard.getRetentionLeaseSyncer(), EMPTY_EVENT_LISTENER);
30133014

30143015
final IndexShardRecoveryException exception1 = expectThrows(IndexShardRecoveryException.class,
30153016
() -> newStartedShard(p -> corruptedShard, true));
@@ -3030,9 +3031,8 @@ public FileVisitResult visitFile(Path file, BasicFileAttributes attrs) throws IO
30303031
assertThat("store has to be marked as corrupted", corruptedMarkerCount.get(), equalTo(1));
30313032

30323033
// try to start another time shard on corrupted files
3033-
final IndexShard corruptedShard2 = newShard(shardRouting, shardPath, indexMetaData,
3034-
null, null, indexShard.engineFactory,
3035-
indexShard.getGlobalCheckpointSyncer(), EMPTY_EVENT_LISTENER);
3034+
final IndexShard corruptedShard2 = newShard(shardRouting, shardPath, indexMetaData, null, null, indexShard.engineFactory,
3035+
indexShard.getGlobalCheckpointSyncer(), indexShard.getRetentionLeaseSyncer(), EMPTY_EVENT_LISTENER);
30363036

30373037
final IndexShardRecoveryException exception2 = expectThrows(IndexShardRecoveryException.class,
30383038
() -> newStartedShard(p -> corruptedShard2, true));
@@ -3070,9 +3070,8 @@ public void testReadSnapshotAndCheckIndexConcurrently() throws Exception {
30703070
.put(indexShard.indexSettings.getSettings())
30713071
.put(IndexSettings.INDEX_CHECK_ON_STARTUP.getKey(), randomFrom("false", "true", "checksum")))
30723072
.build();
3073-
final IndexShard newShard = newShard(shardRouting, indexShard.shardPath(), indexMetaData,
3074-
null, null, indexShard.engineFactory,
3075-
indexShard.getGlobalCheckpointSyncer(), EMPTY_EVENT_LISTENER);
3073+
final IndexShard newShard = newShard(shardRouting, indexShard.shardPath(), indexMetaData, null, null, indexShard.engineFactory,
3074+
indexShard.getGlobalCheckpointSyncer(), indexShard.getRetentionLeaseSyncer(), EMPTY_EVENT_LISTENER);
30763075

30773076
Store.MetadataSnapshot storeFileMetaDatas = newShard.snapshotStoreMetadata();
30783077
assertTrue("at least 2 files, commit and data: " + storeFileMetaDatas.toString(), storeFileMetaDatas.size() > 1);
@@ -3482,15 +3481,14 @@ public void testFlushOnInactive() throws Exception {
34823481
ShardPath shardPath = new ShardPath(false, nodePath.resolve(shardId), nodePath.resolve(shardId), shardId);
34833482
AtomicBoolean markedInactive = new AtomicBoolean();
34843483
AtomicReference<IndexShard> primaryRef = new AtomicReference<>();
3485-
IndexShard primary = newShard(shardRouting, shardPath, metaData, null, null,
3486-
new InternalEngineFactory(), () -> {
3487-
}, new IndexEventListener() {
3488-
@Override
3489-
public void onShardInactive(IndexShard indexShard) {
3490-
markedInactive.set(true);
3491-
primaryRef.get().flush(new FlushRequest());
3492-
}
3493-
});
3484+
IndexShard primary = newShard(shardRouting, shardPath, metaData, null, null, new InternalEngineFactory(), () -> { },
3485+
RetentionLeaseSyncer.EMPTY, new IndexEventListener() {
3486+
@Override
3487+
public void onShardInactive(IndexShard indexShard) {
3488+
markedInactive.set(true);
3489+
primaryRef.get().flush(new FlushRequest());
3490+
}
3491+
});
34943492
primaryRef.set(primary);
34953493
recoverShardFromStore(primary);
34963494
for (int i = 0; i < 3; i++) {

server/src/test/java/org/elasticsearch/index/shard/RemoveCorruptedShardDataCommandTests.java

+5-7
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@
4040
import org.elasticsearch.index.MergePolicyConfig;
4141
import org.elasticsearch.index.engine.EngineException;
4242
import org.elasticsearch.index.engine.InternalEngineFactory;
43+
import org.elasticsearch.index.seqno.RetentionLeaseSyncer;
4344
import org.elasticsearch.index.store.Store;
4445
import org.elasticsearch.index.translog.TestTranslog;
4546
import org.elasticsearch.index.translog.TranslogCorruptedException;
@@ -107,11 +108,8 @@ public void setup() throws IOException {
107108
.putMapping("_doc", "{ \"properties\": {} }");
108109
indexMetaData = metaData.build();
109110

110-
indexShard = newStartedShard(p ->
111-
newShard(routing, shardPath, indexMetaData, null, null,
112-
new InternalEngineFactory(), () -> {
113-
}, EMPTY_EVENT_LISTENER),
114-
true);
111+
indexShard = newStartedShard(p -> newShard(routing, shardPath, indexMetaData, null, null,
112+
new InternalEngineFactory(), () -> { }, RetentionLeaseSyncer.EMPTY, EMPTY_EVENT_LISTENER), true);
115113

116114
translogPath = shardPath.resolveTranslog();
117115
indexPath = shardPath.resolveIndex();
@@ -371,8 +369,8 @@ private IndexShard reopenIndexShard(boolean corrupted) throws IOException {
371369
return new Store(shardId, indexSettings, baseDirectoryWrapper, new DummyShardLock(shardId));
372370
};
373371

374-
return newShard(shardRouting, shardPath, metaData, storeProvider, null,
375-
indexShard.engineFactory, indexShard.getGlobalCheckpointSyncer(), EMPTY_EVENT_LISTENER);
372+
return newShard(shardRouting, shardPath, metaData, storeProvider, null, indexShard.engineFactory,
373+
indexShard.getGlobalCheckpointSyncer(), indexShard.getRetentionLeaseSyncer(), EMPTY_EVENT_LISTENER);
376374
}
377375

378376
private int indexDocs(IndexShard indexShard, boolean flushLast) throws IOException {

server/src/test/java/org/elasticsearch/repositories/blobstore/BlobStoreRepositoryRestoreTests.java

+2
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131
import org.elasticsearch.env.Environment;
3232
import org.elasticsearch.env.TestEnvironment;
3333
import org.elasticsearch.index.engine.InternalEngineFactory;
34+
import org.elasticsearch.index.seqno.RetentionLeaseSyncer;
3435
import org.elasticsearch.index.shard.IndexShard;
3536
import org.elasticsearch.index.shard.IndexShardState;
3637
import org.elasticsearch.index.shard.IndexShardTestCase;
@@ -109,6 +110,7 @@ public void testRestoreSnapshotWithExistingFiles() throws IOException {
109110
null,
110111
new InternalEngineFactory(),
111112
() -> {},
113+
RetentionLeaseSyncer.EMPTY,
112114
EMPTY_EVENT_LISTENER);
113115

114116
// restore the shard

0 commit comments

Comments
 (0)