Skip to content

Commit b9fbc8d

Browse files
authored
Migrate peer recovery from translog to retention lease (#49448)
Since 7.4, we switch from translog to Lucene as the source of history for peer recoveries. However, we reduce the likelihood of operation-based recoveries when performing a full cluster restart from pre-7.4 because existing copies do not have PPRL. To remedy this issue, we fallback using translog in peer recoveries if the recovering replica does not have a peer recovery retention lease, and the replication group hasn't fully migrated to PRRL. Relates #45136
1 parent 6749b63 commit b9fbc8d

File tree

20 files changed

+439
-114
lines changed

20 files changed

+439
-114
lines changed

qa/full-cluster-restart/src/test/java/org/elasticsearch/upgrades/FullClusterRestartIT.java

+66
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@
3434
import org.elasticsearch.common.xcontent.XContentBuilder;
3535
import org.elasticsearch.common.xcontent.json.JsonXContent;
3636
import org.elasticsearch.common.xcontent.support.XContentMapValues;
37+
import org.elasticsearch.index.IndexSettings;
3738
import org.elasticsearch.index.seqno.RetentionLeaseUtils;
3839
import org.elasticsearch.test.NotEqualMessageBuilder;
3940
import org.elasticsearch.test.rest.ESRestTestCase;
@@ -1168,6 +1169,12 @@ private void indexRandomDocuments(
11681169
}
11691170
}
11701171

1172+
private void indexDocument(String id) throws IOException {
1173+
final Request indexRequest = new Request("POST", "/" + index + "/" + "_doc/" + id);
1174+
indexRequest.setJsonEntity(Strings.toString(JsonXContent.contentBuilder().startObject().field("f", "v").endObject()));
1175+
assertOK(client().performRequest(indexRequest));
1176+
}
1177+
11711178
private int countOfIndexedRandomDocuments() throws IOException {
11721179
return Integer.parseInt(loadInfoDocument(index + "_count"));
11731180
}
@@ -1248,4 +1255,63 @@ public void testPeerRecoveryRetentionLeases() throws IOException {
12481255
RetentionLeaseUtils.assertAllCopiesHavePeerRecoveryRetentionLeases(client(), index);
12491256
}
12501257
}
1258+
1259+
/**
1260+
* Tests that with or without soft-deletes, we should perform an operation-based recovery if there were some
1261+
* but not too many uncommitted documents (i.e., less than 10% of committed documents or the extra translog)
1262+
* before we restart the cluster. This is important when we move from translog based to retention leases based
1263+
* peer recoveries.
1264+
*/
1265+
public void testOperationBasedRecovery() throws Exception {
1266+
if (isRunningAgainstOldCluster()) {
1267+
createIndex(index, Settings.builder()
1268+
.put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1)
1269+
.put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 1)
1270+
.put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), randomBoolean())
1271+
.build());
1272+
ensureGreen(index);
1273+
int committedDocs = randomIntBetween(100, 200);
1274+
for (int i = 0; i < committedDocs; i++) {
1275+
indexDocument(Integer.toString(i));
1276+
if (rarely()) {
1277+
flush(index, randomBoolean());
1278+
}
1279+
}
1280+
flush(index, true);
1281+
ensurePeerRecoveryRetentionLeasesRenewedAndSynced(index);
1282+
// less than 10% of the committed docs (see IndexSetting#FILE_BASED_RECOVERY_THRESHOLD_SETTING).
1283+
int uncommittedDocs = randomIntBetween(0, (int) (committedDocs * 0.1));
1284+
for (int i = 0; i < uncommittedDocs; i++) {
1285+
final String id = Integer.toString(randomIntBetween(1, 100));
1286+
indexDocument(id);
1287+
}
1288+
} else {
1289+
ensureGreen(index);
1290+
assertNoFileBasedRecovery(index, n -> true);
1291+
}
1292+
}
1293+
1294+
/**
1295+
* Verifies that once all shard copies on the new version, we should turn off the translog retention for indices with soft-deletes.
1296+
*/
1297+
public void testTurnOffTranslogRetentionAfterUpgraded() throws Exception {
1298+
if (isRunningAgainstOldCluster()) {
1299+
createIndex(index, Settings.builder()
1300+
.put(IndexMetaData.INDEX_NUMBER_OF_SHARDS_SETTING.getKey(), 1)
1301+
.put(IndexMetaData.INDEX_NUMBER_OF_REPLICAS_SETTING.getKey(), 1)
1302+
.put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), true).build());
1303+
ensureGreen(index);
1304+
int numDocs = randomIntBetween(10, 100);
1305+
for (int i = 0; i < numDocs; i++) {
1306+
indexDocument(Integer.toString(randomIntBetween(1, 100)));
1307+
if (rarely()) {
1308+
flush(index, randomBoolean());
1309+
}
1310+
}
1311+
} else {
1312+
ensureGreen(index);
1313+
flush(index, true);
1314+
assertEmptyTranslog(index);
1315+
}
1316+
}
12511317
}

qa/rolling-upgrade/src/test/java/org/elasticsearch/upgrades/RecoveryIT.java

+54-3
Original file line numberDiff line numberDiff line change
@@ -487,10 +487,10 @@ public void testClosedIndexNoopRecovery() throws Exception {
487487
switch (CLUSTER_TYPE) {
488488
case OLD: break;
489489
case MIXED:
490-
assertNoFileBasedRecovery(indexName, s -> s.startsWith(CLUSTER_NAME + "-0"));
490+
assertNoopRecoveries(indexName, s -> s.startsWith(CLUSTER_NAME + "-0"));
491491
break;
492492
case UPGRADED:
493-
assertNoFileBasedRecovery(indexName, s -> s.startsWith(CLUSTER_NAME));
493+
assertNoopRecoveries(indexName, s -> s.startsWith(CLUSTER_NAME));
494494
break;
495495
}
496496
}
@@ -647,7 +647,7 @@ public void testUpdateDoc() throws Exception {
647647
}
648648
}
649649

650-
private void assertNoFileBasedRecovery(String indexName, Predicate<String> targetNode) throws IOException {
650+
private void assertNoopRecoveries(String indexName, Predicate<String> targetNode) throws IOException {
651651
Map<String, Object> recoveries = entityAsMap(client()
652652
.performRequest(new Request("GET", indexName + "/_recovery?detailed=true")));
653653

@@ -678,4 +678,55 @@ private void assertNoFileBasedRecovery(String indexName, Predicate<String> targe
678678

679679
assertTrue("must find replica", foundReplica);
680680
}
681+
682+
/**
683+
* Tests that with or without soft-deletes, we should perform an operation-based recovery if there were some
684+
* but not too many uncommitted documents (i.e., less than 10% of committed documents or the extra translog)
685+
* before we upgrade each node. This is important when we move from translog based to retention leases based
686+
* peer recoveries.
687+
*/
688+
public void testOperationBasedRecovery() throws Exception {
689+
final String index = "test_operation_based_recovery";
690+
if (CLUSTER_TYPE == ClusterType.OLD) {
691+
createIndex(index, Settings.builder()
692+
.put(IndexMetaData.INDEX_NUMBER_OF_SHARDS_SETTING.getKey(), 1)
693+
.put(IndexMetaData.INDEX_NUMBER_OF_REPLICAS_SETTING.getKey(), 2)
694+
.put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), randomBoolean()).build());
695+
ensureGreen(index);
696+
indexDocs(index, 0, randomIntBetween(100, 200));
697+
flush(index, randomBoolean());
698+
ensurePeerRecoveryRetentionLeasesRenewedAndSynced(index);
699+
// uncommitted docs must be less than 10% of committed docs (see IndexSetting#FILE_BASED_RECOVERY_THRESHOLD_SETTING).
700+
indexDocs(index, randomIntBetween(0, 100), randomIntBetween(0, 3));
701+
} else {
702+
ensureGreen(index);
703+
assertNoFileBasedRecovery(index, nodeName ->
704+
CLUSTER_TYPE == ClusterType.UPGRADED
705+
|| nodeName.startsWith(CLUSTER_NAME + "-0")
706+
|| (nodeName.startsWith(CLUSTER_NAME + "-1") && Booleans.parseBoolean(System.getProperty("tests.first_round")) == false));
707+
indexDocs(index, randomIntBetween(0, 100), randomIntBetween(0, 3));
708+
}
709+
}
710+
711+
/**
712+
* Verifies that once all shard copies on the new version, we should turn off the translog retention for indices with soft-deletes.
713+
*/
714+
public void testTurnOffTranslogRetentionAfterUpgraded() throws Exception {
715+
final String index = "turn_off_translog_retention";
716+
if (CLUSTER_TYPE == ClusterType.OLD) {
717+
createIndex(index, Settings.builder()
718+
.put(IndexMetaData.INDEX_NUMBER_OF_SHARDS_SETTING.getKey(), 1)
719+
.put(IndexMetaData.INDEX_NUMBER_OF_REPLICAS_SETTING.getKey(), randomIntBetween(0, 2))
720+
.put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), true).build());
721+
ensureGreen(index);
722+
indexDocs(index, 0, randomIntBetween(100, 200));
723+
flush(index, randomBoolean());
724+
indexDocs(index, randomIntBetween(0, 100), randomIntBetween(0, 100));
725+
}
726+
if (CLUSTER_TYPE == ClusterType.UPGRADED) {
727+
ensureGreen(index);
728+
flush(index, true);
729+
assertEmptyTranslog(index);
730+
}
731+
}
681732
}

server/src/main/java/org/elasticsearch/index/IndexSettings.java

+15-9
Original file line numberDiff line numberDiff line change
@@ -245,21 +245,22 @@ public final class IndexSettings {
245245
* Controls how long translog files that are no longer needed for persistence reasons
246246
* will be kept around before being deleted. Keeping more files is useful to increase
247247
* the chance of ops based recoveries for indices with soft-deletes disabled.
248-
* This setting will be ignored if soft-deletes is enabled.
248+
* This setting will be ignored if soft-deletes is used in peer recoveries (default in 7.4).
249249
**/
250250
public static final Setting<TimeValue> INDEX_TRANSLOG_RETENTION_AGE_SETTING =
251251
Setting.timeSetting("index.translog.retention.age",
252-
settings -> INDEX_SOFT_DELETES_SETTING.get(settings) ? TimeValue.MINUS_ONE : TimeValue.timeValueHours(12), TimeValue.MINUS_ONE,
253-
Property.Dynamic, Property.IndexScope);
252+
settings -> shouldDisableTranslogRetention(settings) ? TimeValue.MINUS_ONE : TimeValue.timeValueHours(12),
253+
TimeValue.MINUS_ONE, Property.Dynamic, Property.IndexScope);
254254

255255
/**
256256
* Controls how many translog files that are no longer needed for persistence reasons
257257
* will be kept around before being deleted. Keeping more files is useful to increase
258258
* the chance of ops based recoveries for indices with soft-deletes disabled.
259-
* This setting will be ignored if soft-deletes is enabled.
259+
* This setting will be ignored if soft-deletes is used in peer recoveries (default in 7.4).
260260
**/
261261
public static final Setting<ByteSizeValue> INDEX_TRANSLOG_RETENTION_SIZE_SETTING =
262-
Setting.byteSizeSetting("index.translog.retention.size", settings -> INDEX_SOFT_DELETES_SETTING.get(settings) ? "-1" : "512MB",
262+
Setting.byteSizeSetting("index.translog.retention.size",
263+
settings -> shouldDisableTranslogRetention(settings) ? "-1" : "512MB",
263264
Property.Dynamic, Property.IndexScope);
264265

265266
/**
@@ -577,7 +578,7 @@ private void setFlushAfterMergeThresholdSize(ByteSizeValue byteSizeValue) {
577578
}
578579

579580
private void setTranslogRetentionSize(ByteSizeValue byteSizeValue) {
580-
if (softDeleteEnabled && byteSizeValue.getBytes() >= 0) {
581+
if (shouldDisableTranslogRetention(settings) && byteSizeValue.getBytes() >= 0) {
581582
// ignore the translog retention settings if soft-deletes enabled
582583
this.translogRetentionSize = new ByteSizeValue(-1);
583584
} else {
@@ -586,7 +587,7 @@ private void setTranslogRetentionSize(ByteSizeValue byteSizeValue) {
586587
}
587588

588589
private void setTranslogRetentionAge(TimeValue age) {
589-
if (softDeleteEnabled && age.millis() >= 0) {
590+
if (shouldDisableTranslogRetention(settings) && age.millis() >= 0) {
590591
// ignore the translog retention settings if soft-deletes enabled
591592
this.translogRetentionAge = TimeValue.MINUS_ONE;
592593
} else {
@@ -774,7 +775,7 @@ public TimeValue getRefreshInterval() {
774775
* Returns the transaction log retention size which controls how much of the translog is kept around to allow for ops based recoveries
775776
*/
776777
public ByteSizeValue getTranslogRetentionSize() {
777-
assert softDeleteEnabled == false || translogRetentionSize.getBytes() == -1L : translogRetentionSize;
778+
assert shouldDisableTranslogRetention(settings) == false || translogRetentionSize.getBytes() == -1L : translogRetentionSize;
778779
return translogRetentionSize;
779780
}
780781

@@ -783,7 +784,7 @@ public ByteSizeValue getTranslogRetentionSize() {
783784
* around
784785
*/
785786
public TimeValue getTranslogRetentionAge() {
786-
assert softDeleteEnabled == false || translogRetentionAge.millis() == -1L : translogRetentionSize;
787+
assert shouldDisableTranslogRetention(settings) == false || translogRetentionAge.millis() == -1L : translogRetentionSize;
787788
return translogRetentionAge;
788789
}
789790

@@ -795,6 +796,11 @@ public int getTranslogRetentionTotalFiles() {
795796
return INDEX_TRANSLOG_RETENTION_TOTAL_FILES_SETTING.get(getSettings());
796797
}
797798

799+
private static boolean shouldDisableTranslogRetention(Settings settings) {
800+
return INDEX_SOFT_DELETES_SETTING.get(settings)
801+
&& IndexMetaData.SETTING_INDEX_VERSION_CREATED.get(settings).onOrAfter(Version.V_7_4_0);
802+
}
803+
798804
/**
799805
* Returns the generation threshold size. As sequence numbers can cause multiple generations to
800806
* be preserved for rollback purposes, we want to keep the size of individual generations from

server/src/main/java/org/elasticsearch/index/engine/Engine.java

+17-7
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,7 @@
6666
import org.elasticsearch.common.lucene.uid.VersionsAndSeqNoResolver.DocIdAndVersion;
6767
import org.elasticsearch.common.metrics.CounterMetric;
6868
import org.elasticsearch.common.regex.Regex;
69+
import org.elasticsearch.common.unit.ByteSizeValue;
6970
import org.elasticsearch.common.unit.TimeValue;
7071
import org.elasticsearch.common.util.concurrent.ReleasableLock;
7172
import org.elasticsearch.index.VersionType;
@@ -729,7 +730,7 @@ public enum SearcherScope {
729730
/**
730731
* Acquires a lock on the translog files and Lucene soft-deleted documents to prevent them from being trimmed
731732
*/
732-
public abstract Closeable acquireRetentionLock();
733+
public abstract Closeable acquireHistoryRetentionLock(HistorySource historySource);
733734

734735
/**
735736
* Creates a new history snapshot from Lucene for reading operations whose seqno in the requesting seqno range (both inclusive).
@@ -742,19 +743,20 @@ public abstract Translog.Snapshot newChangesSnapshot(String source, MapperServic
742743
* Creates a new history snapshot for reading operations since {@code startingSeqNo} (inclusive).
743744
* The returned snapshot can be retrieved from either Lucene index or translog files.
744745
*/
745-
public abstract Translog.Snapshot readHistoryOperations(String source,
746-
MapperService mapperService, long startingSeqNo) throws IOException;
746+
public abstract Translog.Snapshot readHistoryOperations(String reason, HistorySource historySource,
747+
MapperService mapperService, long startingSeqNo) throws IOException;
747748

748749
/**
749750
* Returns the estimated number of history operations whose seq# at least {@code startingSeqNo}(inclusive) in this engine.
750751
*/
751-
public abstract int estimateNumberOfHistoryOperations(String source,
752-
MapperService mapperService, long startingSeqNo) throws IOException;
752+
public abstract int estimateNumberOfHistoryOperations(String reason, HistorySource historySource,
753+
MapperService mapperService, long startingSeqNo) throws IOException;
753754

754755
/**
755756
* Checks if this engine has every operations since {@code startingSeqNo}(inclusive) in its history (either Lucene or translog)
756757
*/
757-
public abstract boolean hasCompleteOperationHistory(String source, MapperService mapperService, long startingSeqNo) throws IOException;
758+
public abstract boolean hasCompleteOperationHistory(String reason, HistorySource historySource,
759+
MapperService mapperService, long startingSeqNo) throws IOException;
758760

759761
/**
760762
* Gets the minimum retained sequence number for this engine.
@@ -1795,7 +1797,8 @@ public IndexCommit getIndexCommit() {
17951797
}
17961798
}
17971799

1798-
public void onSettingsChanged() {
1800+
public void onSettingsChanged(TimeValue translogRetentionAge, ByteSizeValue translogRetentionSize, long softDeletesRetentionOps) {
1801+
17991802
}
18001803

18011804
/**
@@ -1929,4 +1932,11 @@ public interface TranslogRecoveryRunner {
19291932
* to advance this marker to at least the given sequence number.
19301933
*/
19311934
public abstract void advanceMaxSeqNoOfUpdatesOrDeletes(long maxSeqNoOfUpdatesOnPrimary);
1935+
1936+
/**
1937+
* Whether we should read history operations from translog or Lucene index
1938+
*/
1939+
public enum HistorySource {
1940+
TRANSLOG, INDEX
1941+
}
19321942
}

0 commit comments

Comments
 (0)