diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowTasksExecutor.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowTasksExecutor.java index cbc4e4abc2bfe..e523d452c75b1 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowTasksExecutor.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowTasksExecutor.java @@ -132,16 +132,7 @@ protected void innerUpdateMapping(long minRequiredMappingVersion, LongConsumer h final Index followerIndex = params.getFollowShardId().getIndex(); final Index leaderIndex = params.getLeaderShardId().getIndex(); final Supplier timeout = () -> isStopped() ? TimeValue.MINUS_ONE : waitForMetadataTimeOut; - - final Client remoteClient; - try { - remoteClient = remoteClient(params); - } catch (NoSuchRemoteClusterException e) { - errorHandler.accept(e); - return; - } - - CcrRequests.getIndexMetadata(remoteClient, leaderIndex, minRequiredMappingVersion, 0L, timeout, ActionListener.wrap( + final ActionListener listener = ActionListener.wrap( indexMetaData -> { if (indexMetaData.mapping() == null) { assert indexMetaData.getMappingVersion() == 1; @@ -155,7 +146,12 @@ protected void innerUpdateMapping(long minRequiredMappingVersion, LongConsumer h errorHandler)); }, errorHandler - )); + ); + try { + CcrRequests.getIndexMetadata(remoteClient(params), leaderIndex, minRequiredMappingVersion, 0L, timeout, listener); + } catch (NoSuchRemoteClusterException e) { + errorHandler.accept(e); + } } @Override @@ -424,21 +420,27 @@ protected Scheduler.Cancellable scheduleBackgroundRetentionLeaseRenewal(final Lo "{} background adding retention lease [{}] while following", params.getFollowShardId(), retentionLeaseId); - CcrRetentionLeases.asyncAddRetentionLease( + try { + final ActionListener wrappedListener = ActionListener.wrap( + r -> {}, + inner -> { + /* + * If this fails that the retention lease already exists, something highly unusual is + * going on. Log it, and renew again after another renew interval has passed. + */ + final Throwable innerCause = ExceptionsHelper.unwrapCause(inner); + logRetentionLeaseFailure(retentionLeaseId, innerCause); + }); + CcrRetentionLeases.asyncAddRetentionLease( params.getLeaderShardId(), retentionLeaseId, followerGlobalCheckpoint.getAsLong(), remoteClient(params), - ActionListener.wrap( - r -> {}, - inner -> { - /* - * If this fails that the retention lease already exists, something highly unusual is - * going on. Log it, and renew again after another renew interval has passed. - */ - final Throwable innerCause = ExceptionsHelper.unwrapCause(inner); - logRetentionLeaseFailure(retentionLeaseId, innerCause); - })); + wrappedListener); + } catch (NoSuchRemoteClusterException rce) { + // we will attempt to renew again after another renew interval has passed + logRetentionLeaseFailure(retentionLeaseId, rce); + } } else { // if something else happened, we will attempt to renew again after another renew interval has passed }