Skip to content

Recover peers using history from Lucene #44853

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
674ddf2
Recover peers using history from Lucene
DaveCTurner Jul 25, 2019
4d53ec9
Remove unnecessary extra logging
DaveCTurner Jul 25, 2019
56c2466
Precommit
DaveCTurner Jul 25, 2019
ec7c423
Merge branch 'peer-recovery-retention-leases' into 2019-07-25-prrl-in…
DaveCTurner Jul 25, 2019
190649d
Obtain retention lock earlier
DaveCTurner Jul 27, 2019
7222857
TBD discussed
DaveCTurner Jul 27, 2019
94dd36b
Close retention lock while establishing lease
DaveCTurner Jul 29, 2019
e9f8ff6
Merge branch 'peer-recovery-retention-leases' into 2019-07-25-prrl-in…
DaveCTurner Jul 29, 2019
5fb8bda
Create new PRRL using global checkpoint
DaveCTurner Jul 29, 2019
700e34e
Merge branch 'peer-recovery-retention-leases' into 2019-07-25-prrl-in…
DaveCTurner Jul 29, 2019
9e10171
orly
DaveCTurner Jul 29, 2019
63a6e22
Merge branch 'peer-recovery-retention-leases' into 2019-07-25-prrl-in…
DaveCTurner Jul 29, 2019
0dcfb34
Merge branch 'peer-recovery-retention-leases' into 2019-07-25-prrl-in…
DaveCTurner Jul 30, 2019
bc5149a
Assert primary mode
DaveCTurner Jul 30, 2019
8e00661
Merge branch 'peer-recovery-retention-leases' into 2019-07-25-prrl-in…
DaveCTurner Jul 31, 2019
14fde1c
Base initial GCP on the cloned retention lease
DaveCTurner Jul 31, 2019
825fc8f
Imports
DaveCTurner Jul 31, 2019
24b5806
Ensure that commit remains safe
DaveCTurner Jul 31, 2019
d63e777
Sample the GCP later and merely assert that it's ahead of the leased GCP
DaveCTurner Jul 31, 2019
f47e56e
Handle the synced-flush case which skips most of phase 1
DaveCTurner Jul 31, 2019
80cf36a
Merge branch 'peer-recovery-retention-leases' into 2019-07-25-prrl-in…
DaveCTurner Jul 31, 2019
062d6c8
Merge branch 'peer-recovery-retention-leases' into 2019-07-25-prrl-in…
DaveCTurner Aug 1, 2019
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -747,7 +747,7 @@ public abstract int estimateNumberOfHistoryOperations(String source,
MapperService mapperService, long startingSeqNo) throws IOException;

/**
* Checks if this engine has every operations since {@code startingSeqNo}(inclusive) in its translog
* Checks if this engine has every operations since {@code startingSeqNo}(inclusive) in its history (either Lucene or translog)
*/
public abstract boolean hasCompleteOperationHistory(String source, MapperService mapperService, long startingSeqNo) throws IOException;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -514,18 +514,30 @@ public void syncTranslog() throws IOException {
}

/**
* Creates a new history snapshot for reading operations since the provided seqno from the translog.
* Creates a new history snapshot for reading operations since the provided seqno.
* The returned snapshot can be retrieved from either Lucene index or translog files.
*/
@Override
public Translog.Snapshot readHistoryOperations(String source, MapperService mapperService, long startingSeqNo) throws IOException {
if (engineConfig.getIndexSettings().isSoftDeleteEnabled()) {
return newChangesSnapshot(source, mapperService, Math.max(0, startingSeqNo), Long.MAX_VALUE, false);
}

return getTranslog().newSnapshotFromMinSeqNo(startingSeqNo);
}

/**
* Returns the estimated number of history operations whose seq# at least the provided seq# in this engine.
*/
@Override
public int estimateNumberOfHistoryOperations(String source, MapperService mapperService, long startingSeqNo) {
public int estimateNumberOfHistoryOperations(String source, MapperService mapperService, long startingSeqNo) throws IOException {
if (engineConfig.getIndexSettings().isSoftDeleteEnabled()) {
try (Translog.Snapshot snapshot = newChangesSnapshot(source, mapperService, Math.max(0, startingSeqNo),
Long.MAX_VALUE, false)) {
return snapshot.totalOperations();
}
}

return getTranslog().estimateTotalOperationsFromMinSeq(startingSeqNo);
}

Expand Down Expand Up @@ -2573,6 +2585,10 @@ public Translog.Snapshot newChangesSnapshot(String source, MapperService mapperS

@Override
public boolean hasCompleteOperationHistory(String source, MapperService mapperService, long startingSeqNo) throws IOException {
if (engineConfig.getIndexSettings().isSoftDeleteEnabled()) {
return getMinRetainedSeqNo() <= startingSeqNo;
}

final long currentLocalCheckpoint = localCheckpointTracker.getProcessedCheckpoint();
// avoid scanning translog if not necessary
if (startingSeqNo > currentLocalCheckpoint) {
Expand Down Expand Up @@ -2602,15 +2618,7 @@ public final long getMinRetainedSeqNo() {
@Override
public Closeable acquireRetentionLock() {
if (softDeleteEnabled) {
final Releasable softDeletesRetentionLock = softDeletesPolicy.acquireRetentionLock();
final Closeable translogRetentionLock;
try {
translogRetentionLock = translog.acquireRetentionLock();
} catch (Exception e) {
softDeletesRetentionLock.close();
throw e;
}
return () -> IOUtils.close(translogRetentionLock, softDeletesRetentionLock);
return softDeletesPolicy.acquireRetentionLock();
} else {
return translog.acquireRetentionLock();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -290,6 +290,36 @@ public RetentionLease addRetentionLease(
return retentionLease;
}

/**
* Atomically clones an existing retention lease to a new ID.
*
* @param sourceLeaseId the identifier of the source retention lease
* @param targetLeaseId the identifier of the retention lease to create
* @param listener the callback when the retention lease is successfully added and synced to replicas
* @return the new retention lease
* @throws RetentionLeaseNotFoundException if the specified source retention lease does not exist
* @throws RetentionLeaseAlreadyExistsException if the specified target retention lease already exists
*/
RetentionLease cloneRetentionLease(String sourceLeaseId, String targetLeaseId, ActionListener<ReplicationResponse> listener) {
Objects.requireNonNull(listener);
final RetentionLease retentionLease;
final RetentionLeases currentRetentionLeases;
synchronized (this) {
assert primaryMode;
if (getRetentionLeases().contains(sourceLeaseId) == false) {
throw new RetentionLeaseNotFoundException(sourceLeaseId);
}
final RetentionLease sourceLease = getRetentionLeases().get(sourceLeaseId);
retentionLease = innerAddRetentionLease(targetLeaseId, sourceLease.retainingSequenceNumber(), sourceLease.source());
currentRetentionLeases = retentionLeases;
}

// Syncing here may not be strictly necessary, because this new lease isn't retaining any extra history that wasn't previously
// retained by the source lease; however we prefer to sync anyway since we expect to do so whenever creating a new lease.
onSyncRetentionLeases.accept(currentRetentionLeases, listener);
return retentionLease;
}

/**
* Adds a new retention lease, but does not synchronise it with the rest of the replication group.
*
Expand Down Expand Up @@ -442,8 +472,16 @@ public boolean assertRetentionLeasesPersisted(final Path path) throws IOExceptio
* containing the persistent node ID calculated by {@link ReplicationTracker#getPeerRecoveryRetentionLeaseId}, and retain operations
* with sequence numbers strictly greater than the given global checkpoint.
*/
public void addPeerRecoveryRetentionLease(String nodeId, long globalCheckpoint, ActionListener<ReplicationResponse> listener) {
addRetentionLease(getPeerRecoveryRetentionLeaseId(nodeId), globalCheckpoint + 1, PEER_RECOVERY_RETENTION_LEASE_SOURCE, listener);
public RetentionLease addPeerRecoveryRetentionLease(String nodeId, long globalCheckpoint,
ActionListener<ReplicationResponse> listener) {
return addRetentionLease(getPeerRecoveryRetentionLeaseId(nodeId), globalCheckpoint + 1,
PEER_RECOVERY_RETENTION_LEASE_SOURCE, listener);
}

public RetentionLease cloneLocalPeerRecoveryRetentionLease(String nodeId, ActionListener<ReplicationResponse> listener) {
return cloneRetentionLease(
getPeerRecoveryRetentionLeaseId(routingTable.primaryShard()),
getPeerRecoveryRetentionLeaseId(nodeId), listener);
}

public void removePeerRecoveryRetentionLease(String nodeId, ActionListener<ReplicationResponse> listener) {
Expand Down
13 changes: 11 additions & 2 deletions server/src/main/java/org/elasticsearch/index/shard/IndexShard.java
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import org.elasticsearch.Assertions;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.Version;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.admin.indices.flush.FlushRequest;
import org.elasticsearch.action.admin.indices.forcemerge.ForceMergeRequest;
Expand Down Expand Up @@ -2597,9 +2598,17 @@ public boolean isRelocatedPrimary() {
return replicationTracker.isRelocated();
}

public void addPeerRecoveryRetentionLease(String nodeId, long globalCheckpoint, ActionListener<ReplicationResponse> listener) {
public RetentionLease addPeerRecoveryRetentionLease(String nodeId, long globalCheckpoint,
ActionListener<ReplicationResponse> listener) {
assert assertPrimaryMode();
replicationTracker.addPeerRecoveryRetentionLease(nodeId, globalCheckpoint, listener);
// only needed for BWC reasons involving rolling upgrades from versions that do not support PRRLs:
assert indexSettings.getIndexVersionCreated().before(Version.V_7_4_0);
return replicationTracker.addPeerRecoveryRetentionLease(nodeId, globalCheckpoint, listener);
}

public RetentionLease cloneLocalPeerRecoveryRetentionLease(String nodeId, ActionListener<ReplicationResponse> listener) {
assert assertPrimaryMode();
return replicationTracker.cloneLocalPeerRecoveryRetentionLease(nodeId, listener);
}

public void removePeerRecoveryRetentionLease(String nodeId, ActionListener<ReplicationResponse> listener) {
Expand Down
Loading