-
Notifications
You must be signed in to change notification settings - Fork 25.2k
Integrate retention leases to recovery from remote #38829
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 3 commits
82d464d
6138176
9ca7526
530b55b
f6e129e
d294e99
11120d4
b13c6c5
ab4d61c
30f1366
e58bbbc
d5dee9b
ae9cff2
d027302
a42d229
2e6e705
cbe044c
0ac5533
6791827
0dcf6c9
b8b2537
1633dfb
4005309
086e87e
69d757c
cdf9bdc
fa7b4df
5bbb07f
6fc440b
30232bb
d45d829
8fd90fe
21edf1d
7e547ee
d471872
fc9a300
2c77ad4
6da443a
4262372
dec3d28
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -8,6 +8,9 @@ | |
|
||
import com.carrotsearch.hppc.cursors.IntObjectCursor; | ||
import com.carrotsearch.hppc.cursors.ObjectObjectCursor; | ||
import org.apache.logging.log4j.LogManager; | ||
import org.apache.logging.log4j.Logger; | ||
import org.apache.logging.log4j.message.ParameterizedMessage; | ||
import org.apache.lucene.index.IndexCommit; | ||
import org.elasticsearch.ElasticsearchException; | ||
import org.elasticsearch.ExceptionsHelper; | ||
|
@@ -29,13 +32,17 @@ | |
import org.elasticsearch.common.collect.Tuple; | ||
import org.elasticsearch.common.component.AbstractLifecycleComponent; | ||
import org.elasticsearch.common.metrics.CounterMetric; | ||
import org.elasticsearch.common.settings.Setting; | ||
import org.elasticsearch.common.settings.Settings; | ||
import org.elasticsearch.common.unit.ByteSizeValue; | ||
import org.elasticsearch.common.unit.TimeValue; | ||
import org.elasticsearch.common.util.concurrent.AbstractRunnable; | ||
import org.elasticsearch.index.Index; | ||
import org.elasticsearch.index.engine.EngineException; | ||
import org.elasticsearch.index.seqno.LocalCheckpointTracker; | ||
import org.elasticsearch.index.seqno.RetentionLeaseActions; | ||
import org.elasticsearch.index.seqno.RetentionLeaseAlreadyExistsException; | ||
import org.elasticsearch.index.seqno.RetentionLeaseNotFoundException; | ||
import org.elasticsearch.index.shard.IndexShard; | ||
import org.elasticsearch.index.shard.IndexShardRecoveryException; | ||
import org.elasticsearch.index.shard.ShardId; | ||
|
@@ -55,6 +62,7 @@ | |
import org.elasticsearch.snapshots.SnapshotInfo; | ||
import org.elasticsearch.snapshots.SnapshotShardFailure; | ||
import org.elasticsearch.snapshots.SnapshotState; | ||
import org.elasticsearch.threadpool.Scheduler; | ||
import org.elasticsearch.threadpool.ThreadPool; | ||
import org.elasticsearch.xpack.ccr.Ccr; | ||
import org.elasticsearch.xpack.ccr.CcrLicenseChecker; | ||
|
@@ -77,11 +85,14 @@ | |
import java.util.HashMap; | ||
import java.util.List; | ||
import java.util.Map; | ||
import java.util.Optional; | ||
import java.util.Set; | ||
import java.util.concurrent.TimeUnit; | ||
import java.util.concurrent.atomic.AtomicReference; | ||
import java.util.function.LongConsumer; | ||
import java.util.function.Supplier; | ||
|
||
import static org.elasticsearch.index.seqno.RetentionLeaseActions.RETAIN_ALL; | ||
import static org.elasticsearch.index.seqno.SequenceNumbers.NO_OPS_PERFORMED; | ||
|
||
|
||
|
@@ -91,6 +102,8 @@ | |
*/ | ||
public class CcrRepository extends AbstractLifecycleComponent implements Repository { | ||
|
||
private static final Logger logger = LogManager.getLogger(CcrRepository.class); | ||
|
||
public static final String LATEST = "_latest_"; | ||
public static final String TYPE = "_ccr_"; | ||
public static final String NAME_PREFIX = "_ccr_"; | ||
|
@@ -291,12 +304,47 @@ public void restoreShard(IndexShard indexShard, SnapshotId snapshotId, Version v | |
store.decRef(); | ||
} | ||
|
||
Client remoteClient = client.getRemoteClusterClient(remoteClusterAlias); | ||
|
||
Map<String, String> ccrMetaData = indexShard.indexSettings().getIndexMetaData().getCustomData(Ccr.CCR_CUSTOM_METADATA_KEY); | ||
String leaderUUID = ccrMetaData.get(Ccr.CCR_CUSTOM_METADATA_LEADER_INDEX_UUID_KEY); | ||
Index leaderIndex = new Index(shardId.getIndexName(), leaderUUID); | ||
ShardId leaderShardId = new ShardId(leaderIndex, shardId.getId()); | ||
|
||
Client remoteClient = client.getRemoteClusterClient(remoteClusterAlias); | ||
final String retentionLeaseId = indexShard.shardId().getIndex().getUUID() + "-following-" + leaderUUID; | ||
logger.trace( | ||
() -> new ParameterizedMessage("{} requesting leader primary to add retention lease [" + retentionLeaseId + "]", shardId)); | ||
final Optional<RetentionLeaseAlreadyExistsException> maybeAddAlready = | ||
addRetentionLease(leaderShardId, retentionLeaseId, remoteClient); | ||
maybeAddAlready.ifPresent(addAlready -> { | ||
logger.trace(() -> new ParameterizedMessage("{} retention lease already exists, requesting a renewal", shardId), addAlready); | ||
final Optional<RetentionLeaseNotFoundException> maybeRenewNotFound = | ||
renewRetentionLease(leaderShardId, retentionLeaseId, remoteClient); | ||
maybeRenewNotFound.ifPresent(renewNotFound -> { | ||
logger.trace(() -> new ParameterizedMessage( | ||
"{} retention lease not found while attempting to renew, attempting a final add", shardId), renewNotFound); | ||
final Optional<RetentionLeaseAlreadyExistsException> maybeFallbackAddAlready = | ||
addRetentionLease(leaderShardId, retentionLeaseId, remoteClient); | ||
maybeFallbackAddAlready.ifPresent(fallbackAddAlready -> { | ||
/* | ||
* At this point we tried to add the lease and the retention lease already existed. By the time we tried to renew the | ||
* lease, it expired or was removed. We tried to add the lease again and it already exists? Bail. | ||
*/ | ||
assert false : fallbackAddAlready; | ||
throw fallbackAddAlready; | ||
}); | ||
}); | ||
}); | ||
|
||
// schedule renewals to run during the restore | ||
final Scheduler.Cancellable renewable = threadPool.scheduleWithFixedDelay( | ||
() -> { | ||
logger.trace("{} background renewing retention lease during restore", shardId); | ||
renewRetentionLease(leaderShardId, retentionLeaseId, remoteClient); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. as we except this to possibly throw an exception, should we log it here instead of bubbling it up to uncaught exception handler? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes, I took care of this. There was more to do here, including authorizing for security. See a42d229. |
||
}, | ||
RETENTION_LEASE_RENEW_INTERVAL_SETTING.get(indexShard.indexSettings().getSettings()), | ||
Ccr.CCR_THREAD_POOL_NAME); | ||
|
||
// TODO: There should be some local timeout. And if the remote cluster returns an unknown session | ||
// response, we should be able to retry by creating a new session. | ||
String name = metadata.name(); | ||
|
@@ -305,9 +353,48 @@ public void restoreShard(IndexShard indexShard, SnapshotId snapshotId, Version v | |
updateMappings(remoteClient, leaderIndex, restoreSession.mappingVersion, client, indexShard.routingEntry().index()); | ||
} catch (Exception e) { | ||
throw new IndexShardRestoreFailedException(indexShard.shardId(), "failed to restore snapshot [" + snapshotId + "]", e); | ||
} finally { | ||
renewable.cancel(); | ||
} | ||
} | ||
|
||
private Optional<RetentionLeaseAlreadyExistsException> addRetentionLease( | ||
final ShardId leaderShardId, | ||
final String retentionLeaseId, | ||
final Client remoteClient) { | ||
try { | ||
final RetentionLeaseActions.AddRequest request = | ||
new RetentionLeaseActions.AddRequest(leaderShardId, retentionLeaseId, RETAIN_ALL, "ccr"); | ||
remoteClient.execute(RetentionLeaseActions.Add.INSTANCE, request).actionGet(ccrSettings.getRecoveryActionTimeout()); | ||
return Optional.empty(); | ||
} catch (final RetentionLeaseAlreadyExistsException e) { | ||
return Optional.of(e); | ||
} | ||
} | ||
|
||
private Optional<RetentionLeaseNotFoundException> renewRetentionLease( | ||
final ShardId leaderShardId, | ||
final String retentionLeaseId, | ||
final Client remoteClient) { | ||
try { | ||
final RetentionLeaseActions.RenewRequest request = | ||
new RetentionLeaseActions.RenewRequest(leaderShardId, retentionLeaseId, RETAIN_ALL, "ccr"); | ||
remoteClient.execute(RetentionLeaseActions.Renew.INSTANCE, request).actionGet(ccrSettings.getRecoveryActionTimeout()); | ||
return Optional.empty(); | ||
} catch (final RetentionLeaseNotFoundException e) { | ||
return Optional.of(e); | ||
} | ||
} | ||
|
||
// this setting is intentionally not registered, it is only used in tests | ||
public static final Setting<TimeValue> RETENTION_LEASE_RENEW_INTERVAL_SETTING = | ||
Setting.timeSetting( | ||
"index.ccr.retention_lease.renew_interval", | ||
jasontedor marked this conversation as resolved.
Show resolved
Hide resolved
|
||
new TimeValue(5, TimeUnit.MINUTES), | ||
new TimeValue(0, TimeUnit.MILLISECONDS), | ||
Setting.Property.Dynamic, | ||
Setting.Property.IndexScope); | ||
|
||
@Override | ||
public IndexShardSnapshotStatus getShardSnapshotStatus(SnapshotId snapshotId, Version version, IndexId indexId, ShardId leaderShardId) { | ||
throw new UnsupportedOperationException("Unsupported for repository of type: " + TYPE); | ||
|
Uh oh!
There was an error while loading. Please reload this page.