Skip to content

Commit 185cc47

Browse files
authored
Migrate peer recovery from translog to retention lease (#49448) (#50211)
Since 7.4, we switch from translog to Lucene as the source of history for peer recoveries. However, we reduce the likelihood of operation-based recoveries when performing a full cluster restart from pre-7.4 because existing copies do not have PPRL. To remedy this issue, we fallback using translog in peer recoveries if the recovering replica does not have a peer recovery retention lease, and the replication group hasn't fully migrated to PRRL. Relates #45136
1 parent c95e51c commit 185cc47

File tree

20 files changed

+446
-114
lines changed

20 files changed

+446
-114
lines changed

qa/full-cluster-restart/src/test/java/org/elasticsearch/upgrades/FullClusterRestartIT.java

+69
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@
3535
import org.elasticsearch.common.xcontent.XContentBuilder;
3636
import org.elasticsearch.common.xcontent.json.JsonXContent;
3737
import org.elasticsearch.common.xcontent.support.XContentMapValues;
38+
import org.elasticsearch.index.IndexSettings;
3839
import org.elasticsearch.index.seqno.RetentionLeaseUtils;
3940
import org.elasticsearch.rest.action.document.RestBulkAction;
4041
import org.elasticsearch.rest.action.document.RestGetAction;
@@ -1267,6 +1268,12 @@ private void indexRandomDocuments(
12671268
}
12681269
}
12691270

1271+
private void indexDocument(String id) throws IOException {
1272+
final Request indexRequest = new Request("POST", "/" + index + "/" + type + "/" + id);
1273+
indexRequest.setJsonEntity(Strings.toString(JsonXContent.contentBuilder().startObject().field("f", "v").endObject()));
1274+
assertOK(client().performRequest(indexRequest));
1275+
}
1276+
12701277
private int countOfIndexedRandomDocuments() throws IOException {
12711278
return Integer.parseInt(loadInfoDocument("count"));
12721279
}
@@ -1362,4 +1369,66 @@ public void testPeerRecoveryRetentionLeases() throws IOException {
13621369
RetentionLeaseUtils.assertAllCopiesHavePeerRecoveryRetentionLeases(client(), index);
13631370
}
13641371
}
1372+
1373+
/**
1374+
* Tests that with or without soft-deletes, we should perform an operation-based recovery if there were some
1375+
* but not too many uncommitted documents (i.e., less than 10% of committed documents or the extra translog)
1376+
* before we restart the cluster. This is important when we move from translog based to retention leases based
1377+
* peer recoveries.
1378+
*/
1379+
public void testOperationBasedRecovery() throws Exception {
1380+
if (isRunningAgainstOldCluster()) {
1381+
final Settings.Builder settings = Settings.builder()
1382+
.put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1)
1383+
.put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 1);
1384+
if (getOldClusterVersion().onOrAfter(Version.V_6_7_0)) {
1385+
settings.put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), randomBoolean());
1386+
}
1387+
createIndex(index, settings.build());
1388+
ensureGreen(index);
1389+
int committedDocs = randomIntBetween(100, 200);
1390+
for (int i = 0; i < committedDocs; i++) {
1391+
indexDocument(Integer.toString(i));
1392+
if (rarely()) {
1393+
flush(index, randomBoolean());
1394+
}
1395+
}
1396+
flush(index, true);
1397+
ensurePeerRecoveryRetentionLeasesRenewedAndSynced(index);
1398+
// less than 10% of the committed docs (see IndexSetting#FILE_BASED_RECOVERY_THRESHOLD_SETTING).
1399+
int uncommittedDocs = randomIntBetween(0, (int) (committedDocs * 0.1));
1400+
for (int i = 0; i < uncommittedDocs; i++) {
1401+
final String id = Integer.toString(randomIntBetween(1, 100));
1402+
indexDocument(id);
1403+
}
1404+
} else {
1405+
ensureGreen(index);
1406+
assertNoFileBasedRecovery(index, n -> true);
1407+
}
1408+
}
1409+
1410+
/**
1411+
* Verifies that once all shard copies on the new version, we should turn off the translog retention for indices with soft-deletes.
1412+
*/
1413+
public void testTurnOffTranslogRetentionAfterUpgraded() throws Exception {
1414+
assumeTrue("requires soft-deletes and retention leases", getOldClusterVersion().onOrAfter(Version.V_6_7_0));
1415+
if (isRunningAgainstOldCluster()) {
1416+
createIndex(index, Settings.builder()
1417+
.put(IndexMetaData.INDEX_NUMBER_OF_SHARDS_SETTING.getKey(), 1)
1418+
.put(IndexMetaData.INDEX_NUMBER_OF_REPLICAS_SETTING.getKey(), 1)
1419+
.put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), true).build());
1420+
ensureGreen(index);
1421+
int numDocs = randomIntBetween(10, 100);
1422+
for (int i = 0; i < numDocs; i++) {
1423+
indexDocument(Integer.toString(randomIntBetween(1, 100)));
1424+
if (rarely()) {
1425+
flush(index, randomBoolean());
1426+
}
1427+
}
1428+
} else {
1429+
ensureGreen(index);
1430+
flush(index, true);
1431+
assertEmptyTranslog(index);
1432+
}
1433+
}
13651434
}

qa/rolling-upgrade/src/test/java/org/elasticsearch/upgrades/RecoveryIT.java

+54-3
Original file line numberDiff line numberDiff line change
@@ -516,10 +516,10 @@ public void testClosedIndexNoopRecovery() throws Exception {
516516
switch (CLUSTER_TYPE) {
517517
case OLD: break;
518518
case MIXED:
519-
assertNoFileBasedRecovery(indexName, s -> s.startsWith(CLUSTER_NAME + "-0"));
519+
assertNoopRecoveries(indexName, s -> s.startsWith(CLUSTER_NAME + "-0"));
520520
break;
521521
case UPGRADED:
522-
assertNoFileBasedRecovery(indexName, s -> s.startsWith(CLUSTER_NAME));
522+
assertNoopRecoveries(indexName, s -> s.startsWith(CLUSTER_NAME));
523523
break;
524524
}
525525
}
@@ -692,7 +692,7 @@ public void testUpdateDoc() throws Exception {
692692
}
693693
}
694694

695-
private void assertNoFileBasedRecovery(String indexName, Predicate<String> targetNode) throws IOException {
695+
private void assertNoopRecoveries(String indexName, Predicate<String> targetNode) throws IOException {
696696
Map<String, Object> recoveries = entityAsMap(client()
697697
.performRequest(new Request("GET", indexName + "/_recovery?detailed=true")));
698698

@@ -723,4 +723,55 @@ private void assertNoFileBasedRecovery(String indexName, Predicate<String> targe
723723

724724
assertTrue("must find replica", foundReplica);
725725
}
726+
727+
/**
728+
* Tests that with or without soft-deletes, we should perform an operation-based recovery if there were some
729+
* but not too many uncommitted documents (i.e., less than 10% of committed documents or the extra translog)
730+
* before we upgrade each node. This is important when we move from translog based to retention leases based
731+
* peer recoveries.
732+
*/
733+
public void testOperationBasedRecovery() throws Exception {
734+
final String index = "test_operation_based_recovery";
735+
if (CLUSTER_TYPE == ClusterType.OLD) {
736+
createIndex(index, Settings.builder()
737+
.put(IndexMetaData.INDEX_NUMBER_OF_SHARDS_SETTING.getKey(), 1)
738+
.put(IndexMetaData.INDEX_NUMBER_OF_REPLICAS_SETTING.getKey(), 2)
739+
.put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), randomBoolean()).build());
740+
ensureGreen(index);
741+
indexDocs(index, 0, randomIntBetween(100, 200));
742+
flush(index, randomBoolean());
743+
ensurePeerRecoveryRetentionLeasesRenewedAndSynced(index);
744+
// uncommitted docs must be less than 10% of committed docs (see IndexSetting#FILE_BASED_RECOVERY_THRESHOLD_SETTING).
745+
indexDocs(index, randomIntBetween(0, 100), randomIntBetween(0, 3));
746+
} else {
747+
ensureGreen(index);
748+
assertNoFileBasedRecovery(index, nodeName ->
749+
CLUSTER_TYPE == ClusterType.UPGRADED
750+
|| nodeName.startsWith(CLUSTER_NAME + "-0")
751+
|| (nodeName.startsWith(CLUSTER_NAME + "-1") && Booleans.parseBoolean(System.getProperty("tests.first_round")) == false));
752+
indexDocs(index, randomIntBetween(0, 100), randomIntBetween(0, 3));
753+
}
754+
}
755+
756+
/**
757+
* Verifies that once all shard copies on the new version, we should turn off the translog retention for indices with soft-deletes.
758+
*/
759+
public void testTurnOffTranslogRetentionAfterUpgraded() throws Exception {
760+
final String index = "turn_off_translog_retention";
761+
if (CLUSTER_TYPE == ClusterType.OLD) {
762+
createIndex(index, Settings.builder()
763+
.put(IndexMetaData.INDEX_NUMBER_OF_SHARDS_SETTING.getKey(), 1)
764+
.put(IndexMetaData.INDEX_NUMBER_OF_REPLICAS_SETTING.getKey(), randomIntBetween(0, 2))
765+
.put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), true).build());
766+
ensureGreen(index);
767+
indexDocs(index, 0, randomIntBetween(100, 200));
768+
flush(index, randomBoolean());
769+
indexDocs(index, randomIntBetween(0, 100), randomIntBetween(0, 100));
770+
}
771+
if (CLUSTER_TYPE == ClusterType.UPGRADED) {
772+
ensureGreen(index);
773+
flush(index, true);
774+
assertEmptyTranslog(index);
775+
}
776+
}
726777
}

server/src/main/java/org/elasticsearch/index/IndexSettings.java

+15-9
Original file line numberDiff line numberDiff line change
@@ -252,21 +252,22 @@ public final class IndexSettings {
252252
* Controls how long translog files that are no longer needed for persistence reasons
253253
* will be kept around before being deleted. Keeping more files is useful to increase
254254
* the chance of ops based recoveries for indices with soft-deletes disabled.
255-
* This setting will be ignored if soft-deletes is enabled.
255+
* This setting will be ignored if soft-deletes is used in peer recoveries (default in 7.4).
256256
**/
257257
public static final Setting<TimeValue> INDEX_TRANSLOG_RETENTION_AGE_SETTING =
258258
Setting.timeSetting("index.translog.retention.age",
259-
settings -> INDEX_SOFT_DELETES_SETTING.get(settings) ? TimeValue.MINUS_ONE : TimeValue.timeValueHours(12), TimeValue.MINUS_ONE,
260-
Property.Dynamic, Property.IndexScope);
259+
settings -> shouldDisableTranslogRetention(settings) ? TimeValue.MINUS_ONE : TimeValue.timeValueHours(12),
260+
TimeValue.MINUS_ONE, Property.Dynamic, Property.IndexScope);
261261

262262
/**
263263
* Controls how many translog files that are no longer needed for persistence reasons
264264
* will be kept around before being deleted. Keeping more files is useful to increase
265265
* the chance of ops based recoveries for indices with soft-deletes disabled.
266-
* This setting will be ignored if soft-deletes is enabled.
266+
* This setting will be ignored if soft-deletes is used in peer recoveries (default in 7.4).
267267
**/
268268
public static final Setting<ByteSizeValue> INDEX_TRANSLOG_RETENTION_SIZE_SETTING =
269-
Setting.byteSizeSetting("index.translog.retention.size", settings -> INDEX_SOFT_DELETES_SETTING.get(settings) ? "-1" : "512MB",
269+
Setting.byteSizeSetting("index.translog.retention.size",
270+
settings -> shouldDisableTranslogRetention(settings) ? "-1" : "512MB",
270271
Property.Dynamic, Property.IndexScope);
271272

272273
/**
@@ -587,7 +588,7 @@ private void setFlushAfterMergeThresholdSize(ByteSizeValue byteSizeValue) {
587588
}
588589

589590
private void setTranslogRetentionSize(ByteSizeValue byteSizeValue) {
590-
if (softDeleteEnabled && byteSizeValue.getBytes() >= 0) {
591+
if (shouldDisableTranslogRetention(settings) && byteSizeValue.getBytes() >= 0) {
591592
// ignore the translog retention settings if soft-deletes enabled
592593
this.translogRetentionSize = new ByteSizeValue(-1);
593594
} else {
@@ -596,7 +597,7 @@ private void setTranslogRetentionSize(ByteSizeValue byteSizeValue) {
596597
}
597598

598599
private void setTranslogRetentionAge(TimeValue age) {
599-
if (softDeleteEnabled && age.millis() >= 0) {
600+
if (shouldDisableTranslogRetention(settings) && age.millis() >= 0) {
600601
// ignore the translog retention settings if soft-deletes enabled
601602
this.translogRetentionAge = TimeValue.MINUS_ONE;
602603
} else {
@@ -784,7 +785,7 @@ public TimeValue getRefreshInterval() {
784785
* Returns the transaction log retention size which controls how much of the translog is kept around to allow for ops based recoveries
785786
*/
786787
public ByteSizeValue getTranslogRetentionSize() {
787-
assert softDeleteEnabled == false || translogRetentionSize.getBytes() == -1L : translogRetentionSize;
788+
assert shouldDisableTranslogRetention(settings) == false || translogRetentionSize.getBytes() == -1L : translogRetentionSize;
788789
return translogRetentionSize;
789790
}
790791

@@ -793,7 +794,7 @@ public ByteSizeValue getTranslogRetentionSize() {
793794
* around
794795
*/
795796
public TimeValue getTranslogRetentionAge() {
796-
assert softDeleteEnabled == false || translogRetentionAge.millis() == -1L : translogRetentionSize;
797+
assert shouldDisableTranslogRetention(settings) == false || translogRetentionAge.millis() == -1L : translogRetentionSize;
797798
return translogRetentionAge;
798799
}
799800

@@ -805,6 +806,11 @@ public int getTranslogRetentionTotalFiles() {
805806
return INDEX_TRANSLOG_RETENTION_TOTAL_FILES_SETTING.get(getSettings());
806807
}
807808

809+
private static boolean shouldDisableTranslogRetention(Settings settings) {
810+
return INDEX_SOFT_DELETES_SETTING.get(settings)
811+
&& IndexMetaData.SETTING_INDEX_VERSION_CREATED.get(settings).onOrAfter(Version.V_7_4_0);
812+
}
813+
808814
/**
809815
* Returns the generation threshold size. As sequence numbers can cause multiple generations to
810816
* be preserved for rollback purposes, we want to keep the size of individual generations from

server/src/main/java/org/elasticsearch/index/engine/Engine.java

+17-7
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,7 @@
6666
import org.elasticsearch.common.lucene.uid.VersionsAndSeqNoResolver.DocIdAndVersion;
6767
import org.elasticsearch.common.metrics.CounterMetric;
6868
import org.elasticsearch.common.regex.Regex;
69+
import org.elasticsearch.common.unit.ByteSizeValue;
6970
import org.elasticsearch.common.unit.TimeValue;
7071
import org.elasticsearch.common.util.concurrent.ReleasableLock;
7172
import org.elasticsearch.index.VersionType;
@@ -729,7 +730,7 @@ public enum SearcherScope {
729730
/**
730731
* Acquires a lock on the translog files and Lucene soft-deleted documents to prevent them from being trimmed
731732
*/
732-
public abstract Closeable acquireRetentionLock();
733+
public abstract Closeable acquireHistoryRetentionLock(HistorySource historySource);
733734

734735
/**
735736
* Creates a new history snapshot from Lucene for reading operations whose seqno in the requesting seqno range (both inclusive).
@@ -742,19 +743,20 @@ public abstract Translog.Snapshot newChangesSnapshot(String source, MapperServic
742743
* Creates a new history snapshot for reading operations since {@code startingSeqNo} (inclusive).
743744
* The returned snapshot can be retrieved from either Lucene index or translog files.
744745
*/
745-
public abstract Translog.Snapshot readHistoryOperations(String source,
746-
MapperService mapperService, long startingSeqNo) throws IOException;
746+
public abstract Translog.Snapshot readHistoryOperations(String reason, HistorySource historySource,
747+
MapperService mapperService, long startingSeqNo) throws IOException;
747748

748749
/**
749750
* Returns the estimated number of history operations whose seq# at least {@code startingSeqNo}(inclusive) in this engine.
750751
*/
751-
public abstract int estimateNumberOfHistoryOperations(String source,
752-
MapperService mapperService, long startingSeqNo) throws IOException;
752+
public abstract int estimateNumberOfHistoryOperations(String reason, HistorySource historySource,
753+
MapperService mapperService, long startingSeqNo) throws IOException;
753754

754755
/**
755756
* Checks if this engine has every operations since {@code startingSeqNo}(inclusive) in its history (either Lucene or translog)
756757
*/
757-
public abstract boolean hasCompleteOperationHistory(String source, MapperService mapperService, long startingSeqNo) throws IOException;
758+
public abstract boolean hasCompleteOperationHistory(String reason, HistorySource historySource,
759+
MapperService mapperService, long startingSeqNo) throws IOException;
758760

759761
/**
760762
* Gets the minimum retained sequence number for this engine.
@@ -1816,7 +1818,8 @@ public IndexCommit getIndexCommit() {
18161818
}
18171819
}
18181820

1819-
public void onSettingsChanged() {
1821+
public void onSettingsChanged(TimeValue translogRetentionAge, ByteSizeValue translogRetentionSize, long softDeletesRetentionOps) {
1822+
18201823
}
18211824

18221825
/**
@@ -1950,4 +1953,11 @@ public interface TranslogRecoveryRunner {
19501953
* to advance this marker to at least the given sequence number.
19511954
*/
19521955
public abstract void advanceMaxSeqNoOfUpdatesOrDeletes(long maxSeqNoOfUpdatesOnPrimary);
1956+
1957+
/**
1958+
* Whether we should read history operations from translog or Lucene index
1959+
*/
1960+
public enum HistorySource {
1961+
TRANSLOG, INDEX
1962+
}
19531963
}

0 commit comments

Comments
 (0)