Skip to content

Commit 9358784

Browse files
committed
Handle no such remote cluster exception in ccr (#53415)
A remote client can throw a NoSuchRemoteClusterException while fetching the cluster state from the leader cluster. We also need to handle that exception when retrying to add a retention lease to the leader shard. Closes #53225
1 parent 0139e7c commit 9358784

File tree

1 file changed

+25
-23
lines changed

1 file changed

+25
-23
lines changed

x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowTasksExecutor.java

+25-23
Original file line numberDiff line numberDiff line change
@@ -151,18 +151,9 @@ protected void innerUpdateMapping(long minRequiredMappingVersion, LongConsumer h
151151
final Index followerIndex = params.getFollowShardId().getIndex();
152152
final Index leaderIndex = params.getLeaderShardId().getIndex();
153153
final Supplier<TimeValue> timeout = () -> isStopped() ? TimeValue.MINUS_ONE : waitForMetadataTimeOut;
154-
155-
final Client remoteClient;
156-
try {
157-
remoteClient = remoteClient(params);
158-
} catch (NoSuchRemoteClusterException e) {
159-
errorHandler.accept(e);
160-
return;
161-
}
162-
163-
CcrRequests.getIndexMetadata(remoteClient, leaderIndex, minRequiredMappingVersion, 0L, timeout, ActionListener.wrap(
154+
final ActionListener<IndexMetaData> listener = ActionListener.wrap(
164155
indexMetaData -> {
165-
if (indexMetaData.getMappings().isEmpty()) {
156+
if (indexMetaData.mapping() == null) {
166157
assert indexMetaData.getMappingVersion() == 1;
167158
handler.accept(indexMetaData.getMappingVersion());
168159
return;
@@ -176,7 +167,12 @@ protected void innerUpdateMapping(long minRequiredMappingVersion, LongConsumer h
176167
errorHandler));
177168
},
178169
errorHandler
179-
));
170+
);
171+
try {
172+
CcrRequests.getIndexMetadata(remoteClient(params), leaderIndex, minRequiredMappingVersion, 0L, timeout, listener);
173+
} catch (NoSuchRemoteClusterException e) {
174+
errorHandler.accept(e);
175+
}
180176
}
181177

182178
@Override
@@ -445,21 +441,27 @@ protected Scheduler.Cancellable scheduleBackgroundRetentionLeaseRenewal(final Lo
445441
"{} background adding retention lease [{}] while following",
446442
params.getFollowShardId(),
447443
retentionLeaseId);
448-
CcrRetentionLeases.asyncAddRetentionLease(
444+
try {
445+
final ActionListener<RetentionLeaseActions.Response> wrappedListener = ActionListener.wrap(
446+
r -> {},
447+
inner -> {
448+
/*
449+
* If this fails that the retention lease already exists, something highly unusual is
450+
* going on. Log it, and renew again after another renew interval has passed.
451+
*/
452+
final Throwable innerCause = ExceptionsHelper.unwrapCause(inner);
453+
logRetentionLeaseFailure(retentionLeaseId, innerCause);
454+
});
455+
CcrRetentionLeases.asyncAddRetentionLease(
449456
params.getLeaderShardId(),
450457
retentionLeaseId,
451458
followerGlobalCheckpoint.getAsLong(),
452459
remoteClient(params),
453-
ActionListener.wrap(
454-
r -> {},
455-
inner -> {
456-
/*
457-
* If this fails that the retention lease already exists, something highly unusual is
458-
* going on. Log it, and renew again after another renew interval has passed.
459-
*/
460-
final Throwable innerCause = ExceptionsHelper.unwrapCause(inner);
461-
logRetentionLeaseFailure(retentionLeaseId, innerCause);
462-
}));
460+
wrappedListener);
461+
} catch (NoSuchRemoteClusterException rce) {
462+
// we will attempt to renew again after another renew interval has passed
463+
logRetentionLeaseFailure(retentionLeaseId, rce);
464+
}
463465
} else {
464466
// if something else happened, we will attempt to renew again after another renew interval has passed
465467
}

0 commit comments

Comments
 (0)