Skip to content

Commit 5322b00

Browse files
authored
Recover peers using history from Lucene (#44853)
Thanks to peer recovery retention leases we now retain the history needed to perform peer recoveries from the index instead of from the translog. This commit adjusts the peer recovery process to do so, and also adjusts it to use the existence of a retention lease to decide whether or not to attempt an operations-based recovery. Reverts #38904 and #42211 Relates #41536
1 parent 6960cf7 commit 5322b00

13 files changed

+557
-92
lines changed

server/src/main/java/org/elasticsearch/index/engine/Engine.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -747,7 +747,7 @@ public abstract int estimateNumberOfHistoryOperations(String source,
747747
MapperService mapperService, long startingSeqNo) throws IOException;
748748

749749
/**
750-
* Checks if this engine has every operations since {@code startingSeqNo}(inclusive) in its translog
750+
* Checks if this engine has every operations since {@code startingSeqNo}(inclusive) in its history (either Lucene or translog)
751751
*/
752752
public abstract boolean hasCompleteOperationHistory(String source, MapperService mapperService, long startingSeqNo) throws IOException;
753753

server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java

+19-11
Original file line numberDiff line numberDiff line change
@@ -514,18 +514,30 @@ public void syncTranslog() throws IOException {
514514
}
515515

516516
/**
517-
* Creates a new history snapshot for reading operations since the provided seqno from the translog.
517+
* Creates a new history snapshot for reading operations since the provided seqno.
518+
* The returned snapshot can be retrieved from either Lucene index or translog files.
518519
*/
519520
@Override
520521
public Translog.Snapshot readHistoryOperations(String source, MapperService mapperService, long startingSeqNo) throws IOException {
522+
if (engineConfig.getIndexSettings().isSoftDeleteEnabled()) {
523+
return newChangesSnapshot(source, mapperService, Math.max(0, startingSeqNo), Long.MAX_VALUE, false);
524+
}
525+
521526
return getTranslog().newSnapshotFromMinSeqNo(startingSeqNo);
522527
}
523528

524529
/**
525530
* Returns the estimated number of history operations whose seq# at least the provided seq# in this engine.
526531
*/
527532
@Override
528-
public int estimateNumberOfHistoryOperations(String source, MapperService mapperService, long startingSeqNo) {
533+
public int estimateNumberOfHistoryOperations(String source, MapperService mapperService, long startingSeqNo) throws IOException {
534+
if (engineConfig.getIndexSettings().isSoftDeleteEnabled()) {
535+
try (Translog.Snapshot snapshot = newChangesSnapshot(source, mapperService, Math.max(0, startingSeqNo),
536+
Long.MAX_VALUE, false)) {
537+
return snapshot.totalOperations();
538+
}
539+
}
540+
529541
return getTranslog().estimateTotalOperationsFromMinSeq(startingSeqNo);
530542
}
531543

@@ -2573,6 +2585,10 @@ public Translog.Snapshot newChangesSnapshot(String source, MapperService mapperS
25732585

25742586
@Override
25752587
public boolean hasCompleteOperationHistory(String source, MapperService mapperService, long startingSeqNo) throws IOException {
2588+
if (engineConfig.getIndexSettings().isSoftDeleteEnabled()) {
2589+
return getMinRetainedSeqNo() <= startingSeqNo;
2590+
}
2591+
25762592
final long currentLocalCheckpoint = localCheckpointTracker.getProcessedCheckpoint();
25772593
// avoid scanning translog if not necessary
25782594
if (startingSeqNo > currentLocalCheckpoint) {
@@ -2602,15 +2618,7 @@ public final long getMinRetainedSeqNo() {
26022618
@Override
26032619
public Closeable acquireRetentionLock() {
26042620
if (softDeleteEnabled) {
2605-
final Releasable softDeletesRetentionLock = softDeletesPolicy.acquireRetentionLock();
2606-
final Closeable translogRetentionLock;
2607-
try {
2608-
translogRetentionLock = translog.acquireRetentionLock();
2609-
} catch (Exception e) {
2610-
softDeletesRetentionLock.close();
2611-
throw e;
2612-
}
2613-
return () -> IOUtils.close(translogRetentionLock, softDeletesRetentionLock);
2621+
return softDeletesPolicy.acquireRetentionLock();
26142622
} else {
26152623
return translog.acquireRetentionLock();
26162624
}

server/src/main/java/org/elasticsearch/index/seqno/ReplicationTracker.java

+40-2
Original file line numberDiff line numberDiff line change
@@ -290,6 +290,36 @@ public RetentionLease addRetentionLease(
290290
return retentionLease;
291291
}
292292

293+
/**
294+
* Atomically clones an existing retention lease to a new ID.
295+
*
296+
* @param sourceLeaseId the identifier of the source retention lease
297+
* @param targetLeaseId the identifier of the retention lease to create
298+
* @param listener the callback when the retention lease is successfully added and synced to replicas
299+
* @return the new retention lease
300+
* @throws RetentionLeaseNotFoundException if the specified source retention lease does not exist
301+
* @throws RetentionLeaseAlreadyExistsException if the specified target retention lease already exists
302+
*/
303+
RetentionLease cloneRetentionLease(String sourceLeaseId, String targetLeaseId, ActionListener<ReplicationResponse> listener) {
304+
Objects.requireNonNull(listener);
305+
final RetentionLease retentionLease;
306+
final RetentionLeases currentRetentionLeases;
307+
synchronized (this) {
308+
assert primaryMode;
309+
if (getRetentionLeases().contains(sourceLeaseId) == false) {
310+
throw new RetentionLeaseNotFoundException(sourceLeaseId);
311+
}
312+
final RetentionLease sourceLease = getRetentionLeases().get(sourceLeaseId);
313+
retentionLease = innerAddRetentionLease(targetLeaseId, sourceLease.retainingSequenceNumber(), sourceLease.source());
314+
currentRetentionLeases = retentionLeases;
315+
}
316+
317+
// Syncing here may not be strictly necessary, because this new lease isn't retaining any extra history that wasn't previously
318+
// retained by the source lease; however we prefer to sync anyway since we expect to do so whenever creating a new lease.
319+
onSyncRetentionLeases.accept(currentRetentionLeases, listener);
320+
return retentionLease;
321+
}
322+
293323
/**
294324
* Adds a new retention lease, but does not synchronise it with the rest of the replication group.
295325
*
@@ -442,8 +472,16 @@ public boolean assertRetentionLeasesPersisted(final Path path) throws IOExceptio
442472
* containing the persistent node ID calculated by {@link ReplicationTracker#getPeerRecoveryRetentionLeaseId}, and retain operations
443473
* with sequence numbers strictly greater than the given global checkpoint.
444474
*/
445-
public void addPeerRecoveryRetentionLease(String nodeId, long globalCheckpoint, ActionListener<ReplicationResponse> listener) {
446-
addRetentionLease(getPeerRecoveryRetentionLeaseId(nodeId), globalCheckpoint + 1, PEER_RECOVERY_RETENTION_LEASE_SOURCE, listener);
475+
public RetentionLease addPeerRecoveryRetentionLease(String nodeId, long globalCheckpoint,
476+
ActionListener<ReplicationResponse> listener) {
477+
return addRetentionLease(getPeerRecoveryRetentionLeaseId(nodeId), globalCheckpoint + 1,
478+
PEER_RECOVERY_RETENTION_LEASE_SOURCE, listener);
479+
}
480+
481+
public RetentionLease cloneLocalPeerRecoveryRetentionLease(String nodeId, ActionListener<ReplicationResponse> listener) {
482+
return cloneRetentionLease(
483+
getPeerRecoveryRetentionLeaseId(routingTable.primaryShard()),
484+
getPeerRecoveryRetentionLeaseId(nodeId), listener);
447485
}
448486

449487
public void removePeerRecoveryRetentionLease(String nodeId, ActionListener<ReplicationResponse> listener) {

server/src/main/java/org/elasticsearch/index/shard/IndexShard.java

+11-2
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@
4040
import org.elasticsearch.Assertions;
4141
import org.elasticsearch.ElasticsearchException;
4242
import org.elasticsearch.ExceptionsHelper;
43+
import org.elasticsearch.Version;
4344
import org.elasticsearch.action.ActionListener;
4445
import org.elasticsearch.action.admin.indices.flush.FlushRequest;
4546
import org.elasticsearch.action.admin.indices.forcemerge.ForceMergeRequest;
@@ -2597,9 +2598,17 @@ public boolean isRelocatedPrimary() {
25972598
return replicationTracker.isRelocated();
25982599
}
25992600

2600-
public void addPeerRecoveryRetentionLease(String nodeId, long globalCheckpoint, ActionListener<ReplicationResponse> listener) {
2601+
public RetentionLease addPeerRecoveryRetentionLease(String nodeId, long globalCheckpoint,
2602+
ActionListener<ReplicationResponse> listener) {
26012603
assert assertPrimaryMode();
2602-
replicationTracker.addPeerRecoveryRetentionLease(nodeId, globalCheckpoint, listener);
2604+
// only needed for BWC reasons involving rolling upgrades from versions that do not support PRRLs:
2605+
assert indexSettings.getIndexVersionCreated().before(Version.V_7_4_0);
2606+
return replicationTracker.addPeerRecoveryRetentionLease(nodeId, globalCheckpoint, listener);
2607+
}
2608+
2609+
public RetentionLease cloneLocalPeerRecoveryRetentionLease(String nodeId, ActionListener<ReplicationResponse> listener) {
2610+
assert assertPrimaryMode();
2611+
return replicationTracker.cloneLocalPeerRecoveryRetentionLease(nodeId, listener);
26032612
}
26042613

26052614
public void removePeerRecoveryRetentionLease(String nodeId, ActionListener<ReplicationResponse> listener) {

0 commit comments

Comments
 (0)