Skip to content

Commit 9ff320d

Browse files
authored
Use index for peer recovery instead of translog (#45137)
Today we recover a replica by copying operations from the primary's translog. However we also retain some historical operations in the index itself, as long as soft-deletes are enabled. This commit adjusts peer recovery to use the operations in the index for recovery rather than those in the translog, and ensures that the replication group retains enough history for use in peer recovery by means of retention leases. Reverts #38904 and #42211 Relates #41536 Backport of #45136 to 7.x.
1 parent 3366726 commit 9ff320d

File tree

46 files changed

+2508
-551
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

46 files changed

+2508
-551
lines changed

client/rest-high-level/src/test/java/org/elasticsearch/client/CCRIT.java

+4-1
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,7 @@
5454
import org.elasticsearch.common.xcontent.XContentHelper;
5555
import org.elasticsearch.common.xcontent.XContentType;
5656
import org.elasticsearch.common.xcontent.json.JsonXContent;
57+
import org.elasticsearch.index.seqno.ReplicationTracker;
5758
import org.elasticsearch.test.rest.yaml.ObjectPath;
5859
import org.junit.Before;
5960

@@ -260,7 +261,9 @@ public void testForgetFollower() throws IOException {
260261
final Map<?, ?> shardStatsAsMap = (Map<?, ?>) shardStats.get(0);
261262
final Map<?, ?> retentionLeasesStats = (Map<?, ?>) shardStatsAsMap.get("retention_leases");
262263
final List<?> leases = (List<?>) retentionLeasesStats.get("leases");
263-
assertThat(leases, empty());
264+
for (final Object lease : leases) {
265+
assertThat(((Map<?, ?>) lease).get("source"), equalTo(ReplicationTracker.PEER_RECOVERY_RETENTION_LEASE_SOURCE));
266+
}
264267
}
265268
}
266269

qa/full-cluster-restart/src/test/java/org/elasticsearch/upgrades/FullClusterRestartIT.java

+29-1
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@
3535
import org.elasticsearch.common.xcontent.XContentBuilder;
3636
import org.elasticsearch.common.xcontent.json.JsonXContent;
3737
import org.elasticsearch.common.xcontent.support.XContentMapValues;
38+
import org.elasticsearch.index.seqno.RetentionLeaseUtils;
3839
import org.elasticsearch.rest.action.document.RestBulkAction;
3940
import org.elasticsearch.rest.action.document.RestGetAction;
4041
import org.elasticsearch.rest.action.document.RestIndexAction;
@@ -89,7 +90,7 @@ public class FullClusterRestartIT extends AbstractFullClusterRestartTestCase {
8990
private String type;
9091

9192
@Before
92-
public void setIndex() throws IOException {
93+
public void setIndex() {
9394
index = getTestName().toLowerCase(Locale.ROOT);
9495
}
9596

@@ -1338,4 +1339,31 @@ protected void ensureGreenLongWait(String index) throws IOException {
13381339
assertEquals("green", healthRsp.get("status"));
13391340
assertFalse((Boolean) healthRsp.get("timed_out"));
13401341
}
1342+
1343+
public void testPeerRecoveryRetentionLeases() throws IOException {
1344+
assumeTrue(getOldClusterVersion() + " does not support soft deletes", getOldClusterVersion().onOrAfter(Version.V_6_5_0));
1345+
if (isRunningAgainstOldCluster()) {
1346+
XContentBuilder settings = jsonBuilder();
1347+
settings.startObject();
1348+
{
1349+
settings.startObject("settings");
1350+
settings.field("number_of_shards", between(1, 5));
1351+
settings.field("number_of_replicas", between(0, 1));
1352+
if (randomBoolean() || getOldClusterVersion().before(Version.V_7_0_0)) {
1353+
// this is the default after v7.0.0, but is required before that
1354+
settings.field("soft_deletes.enabled", true);
1355+
}
1356+
settings.endObject();
1357+
}
1358+
settings.endObject();
1359+
1360+
Request createIndex = new Request("PUT", "/" + index);
1361+
createIndex.setJsonEntity(Strings.toString(settings));
1362+
client().performRequest(createIndex);
1363+
ensureGreen(index);
1364+
} else {
1365+
ensureGreen(index);
1366+
RetentionLeaseUtils.assertAllCopiesHavePeerRecoveryRetentionLeases(client(), index);
1367+
}
1368+
}
13411369
}

qa/rolling-upgrade/src/test/java/org/elasticsearch/upgrades/RecoveryIT.java

+75
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@
3232
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
3333
import org.elasticsearch.common.xcontent.support.XContentMapValues;
3434
import org.elasticsearch.index.IndexSettings;
35+
import org.elasticsearch.index.seqno.RetentionLeaseUtils;
3536
import org.elasticsearch.rest.RestStatus;
3637
import org.elasticsearch.rest.action.document.RestIndexAction;
3738
import org.elasticsearch.rest.action.document.RestUpdateAction;
@@ -384,6 +385,80 @@ public void testRecoveryWithSoftDeletes() throws Exception {
384385
ensureGreen(index);
385386
}
386387

388+
public void testRetentionLeasesEstablishedWhenPromotingPrimary() throws Exception {
389+
final String index = "recover_and_create_leases_in_promotion";
390+
if (CLUSTER_TYPE == ClusterType.OLD) {
391+
Settings.Builder settings = Settings.builder()
392+
.put(IndexMetaData.INDEX_NUMBER_OF_SHARDS_SETTING.getKey(), between(1, 5))
393+
.put(IndexMetaData.INDEX_NUMBER_OF_REPLICAS_SETTING.getKey(), between(1, 2)) // triggers nontrivial promotion
394+
.put(INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING.getKey(), "100ms")
395+
.put(SETTING_ALLOCATION_MAX_RETRY.getKey(), "0") // fail faster
396+
.put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), true);
397+
createIndex(index, settings.build());
398+
int numDocs = randomInt(10);
399+
indexDocs(index, 0, numDocs);
400+
if (randomBoolean()) {
401+
client().performRequest(new Request("POST", "/" + index + "/_flush"));
402+
}
403+
}
404+
ensureGreen(index);
405+
if (CLUSTER_TYPE == ClusterType.UPGRADED) {
406+
assertBusy(() -> RetentionLeaseUtils.assertAllCopiesHavePeerRecoveryRetentionLeases(client(), index));
407+
}
408+
}
409+
410+
public void testRetentionLeasesEstablishedWhenRelocatingPrimary() throws Exception {
411+
final String index = "recover_and_create_leases_in_relocation";
412+
switch (CLUSTER_TYPE) {
413+
case OLD:
414+
Settings.Builder settings = Settings.builder()
415+
.put(IndexMetaData.INDEX_NUMBER_OF_SHARDS_SETTING.getKey(), between(1, 5))
416+
.put(IndexMetaData.INDEX_NUMBER_OF_REPLICAS_SETTING.getKey(), between(0, 1))
417+
.put(INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING.getKey(), "100ms")
418+
.put(SETTING_ALLOCATION_MAX_RETRY.getKey(), "0") // fail faster
419+
.put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), true);
420+
createIndex(index, settings.build());
421+
int numDocs = randomInt(10);
422+
indexDocs(index, 0, numDocs);
423+
if (randomBoolean()) {
424+
client().performRequest(new Request("POST", "/" + index + "/_flush"));
425+
}
426+
ensureGreen(index);
427+
break;
428+
429+
case MIXED:
430+
// trigger a primary relocation by excluding the last old node with a shard filter
431+
final Map<?, ?> nodesMap
432+
= ObjectPath.createFromResponse(client().performRequest(new Request("GET", "/_nodes"))).evaluate("nodes");
433+
final List<String> oldNodeNames = new ArrayList<>();
434+
for (Object nodeDetails : nodesMap.values()) {
435+
final Map<?, ?> nodeDetailsMap = (Map<?, ?>) nodeDetails;
436+
final String versionString = (String) nodeDetailsMap.get("version");
437+
if (versionString.equals(Version.CURRENT.toString()) == false) {
438+
oldNodeNames.add((String) nodeDetailsMap.get("name"));
439+
}
440+
}
441+
442+
if (oldNodeNames.size() == 1) {
443+
final String oldNodeName = oldNodeNames.get(0);
444+
logger.info("--> excluding index [{}] from node [{}]", index, oldNodeName);
445+
final Request putSettingsRequest = new Request("PUT", "/" + index + "/_settings");
446+
putSettingsRequest.setJsonEntity("{\"index.routing.allocation.exclude._name\":\"" + oldNodeName + "\"}");
447+
assertOK(client().performRequest(putSettingsRequest));
448+
ensureGreen(index);
449+
assertBusy(() -> RetentionLeaseUtils.assertAllCopiesHavePeerRecoveryRetentionLeases(client(), index));
450+
} else {
451+
ensureGreen(index);
452+
}
453+
break;
454+
455+
case UPGRADED:
456+
ensureGreen(index);
457+
assertBusy(() -> RetentionLeaseUtils.assertAllCopiesHavePeerRecoveryRetentionLeases(client(), index));
458+
break;
459+
}
460+
}
461+
387462
/**
388463
* This test creates an index in the non upgraded cluster and closes it. It then checks that the index
389464
* is effectively closed and potentially replicated (if the version the index was created on supports

server/src/main/java/org/elasticsearch/index/engine/Engine.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -747,7 +747,7 @@ public abstract int estimateNumberOfHistoryOperations(String source,
747747
MapperService mapperService, long startingSeqNo) throws IOException;
748748

749749
/**
750-
* Checks if this engine has every operations since {@code startingSeqNo}(inclusive) in its translog
750+
* Checks if this engine has every operations since {@code startingSeqNo}(inclusive) in its history (either Lucene or translog)
751751
*/
752752
public abstract boolean hasCompleteOperationHistory(String source, MapperService mapperService, long startingSeqNo) throws IOException;
753753

server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java

+19-11
Original file line numberDiff line numberDiff line change
@@ -518,18 +518,30 @@ public void syncTranslog() throws IOException {
518518
}
519519

520520
/**
521-
* Creates a new history snapshot for reading operations since the provided seqno from the translog.
521+
* Creates a new history snapshot for reading operations since the provided seqno.
522+
* The returned snapshot can be retrieved from either Lucene index or translog files.
522523
*/
523524
@Override
524525
public Translog.Snapshot readHistoryOperations(String source, MapperService mapperService, long startingSeqNo) throws IOException {
526+
if (engineConfig.getIndexSettings().isSoftDeleteEnabled()) {
527+
return newChangesSnapshot(source, mapperService, Math.max(0, startingSeqNo), Long.MAX_VALUE, false);
528+
}
529+
525530
return getTranslog().newSnapshotFromMinSeqNo(startingSeqNo);
526531
}
527532

528533
/**
529534
* Returns the estimated number of history operations whose seq# at least the provided seq# in this engine.
530535
*/
531536
@Override
532-
public int estimateNumberOfHistoryOperations(String source, MapperService mapperService, long startingSeqNo) {
537+
public int estimateNumberOfHistoryOperations(String source, MapperService mapperService, long startingSeqNo) throws IOException {
538+
if (engineConfig.getIndexSettings().isSoftDeleteEnabled()) {
539+
try (Translog.Snapshot snapshot = newChangesSnapshot(source, mapperService, Math.max(0, startingSeqNo),
540+
Long.MAX_VALUE, false)) {
541+
return snapshot.totalOperations();
542+
}
543+
}
544+
533545
return getTranslog().estimateTotalOperationsFromMinSeq(startingSeqNo);
534546
}
535547

@@ -2579,6 +2591,10 @@ public Translog.Snapshot newChangesSnapshot(String source, MapperService mapperS
25792591

25802592
@Override
25812593
public boolean hasCompleteOperationHistory(String source, MapperService mapperService, long startingSeqNo) throws IOException {
2594+
if (engineConfig.getIndexSettings().isSoftDeleteEnabled()) {
2595+
return getMinRetainedSeqNo() <= startingSeqNo;
2596+
}
2597+
25822598
final long currentLocalCheckpoint = localCheckpointTracker.getProcessedCheckpoint();
25832599
// avoid scanning translog if not necessary
25842600
if (startingSeqNo > currentLocalCheckpoint) {
@@ -2608,15 +2624,7 @@ public final long getMinRetainedSeqNo() {
26082624
@Override
26092625
public Closeable acquireRetentionLock() {
26102626
if (softDeleteEnabled) {
2611-
final Releasable softDeletesRetentionLock = softDeletesPolicy.acquireRetentionLock();
2612-
final Closeable translogRetentionLock;
2613-
try {
2614-
translogRetentionLock = translog.acquireRetentionLock();
2615-
} catch (Exception e) {
2616-
softDeletesRetentionLock.close();
2617-
throw e;
2618-
}
2619-
return () -> IOUtils.close(translogRetentionLock, softDeletesRetentionLock);
2627+
return softDeletesPolicy.acquireRetentionLock();
26202628
} else {
26212629
return translog.acquireRetentionLock();
26222630
}

0 commit comments

Comments
 (0)