Skip to content

Commit 5dd6c68

Browse files
committed
Return recovery to generic thread post-PRRL action (#44000)
Today we perform `TransportReplicationAction` derivatives during recovery, and these actions call their response handlers on the transport thread. This change moves the continued execution of the recovery back onto the generic threadpool.
1 parent a4d5cf1 commit 5dd6c68

File tree

1 file changed

+23
-13
lines changed

1 file changed

+23
-13
lines changed

server/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java

+23-13
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@
3333
import org.elasticsearch.Version;
3434
import org.elasticsearch.action.ActionListener;
3535
import org.elasticsearch.action.StepListener;
36+
import org.elasticsearch.action.support.ThreadedActionListener;
3637
import org.elasticsearch.action.support.replication.ReplicationResponse;
3738
import org.elasticsearch.cluster.metadata.IndexMetaData;
3839
import org.elasticsearch.cluster.routing.IndexShardRoutingTable;
@@ -66,6 +67,7 @@
6667
import org.elasticsearch.index.translog.Translog;
6768
import org.elasticsearch.threadpool.ThreadPool;
6869
import org.elasticsearch.transport.RemoteTransportException;
70+
import org.elasticsearch.transport.Transports;
6971

7072
import java.io.Closeable;
7173
import java.io.IOException;
@@ -146,8 +148,10 @@ public void recoverToTarget(ActionListener<RecoveryResponse> listener) {
146148
IOUtils.closeWhileHandlingException(releaseResources, () -> wrappedListener.onFailure(e));
147149
throw e;
148150
});
149-
final Consumer<Exception> onFailure = e ->
151+
final Consumer<Exception> onFailure = e -> {
152+
Transports.assertNotTransportThread("failure of recovery from " + shard.routingEntry() + " to " + request.targetNode());
150153
IOUtils.closeWhileHandlingException(releaseResources, () -> wrappedListener.onFailure(e));
154+
};
151155

152156
runUnderPrimaryPermit(() -> {
153157
final IndexShardRoutingTable routingTable = shard.getReplicationGroup().getRoutingTable();
@@ -208,7 +212,9 @@ public void recoverToTarget(ActionListener<RecoveryResponse> listener) {
208212
// If the target previously had a copy of this shard then a file-based recovery might move its global
209213
// checkpoint backwards. We must therefore remove any existing retention lease so that we can create a
210214
// new one later on in the recovery.
211-
shard.removePeerRecoveryRetentionLease(request.targetNode().getId(), deleteRetentionLeaseStep);
215+
shard.removePeerRecoveryRetentionLease(request.targetNode().getId(),
216+
new ThreadedActionListener<>(logger, shard.getThreadPool(), ThreadPool.Names.GENERIC,
217+
deleteRetentionLeaseStep, false));
212218
} catch (RetentionLeaseNotFoundException e) {
213219
logger.debug("no peer-recovery retention lease for " + request.targetAllocationId());
214220
deleteRetentionLeaseStep.onResponse(null);
@@ -220,6 +226,7 @@ public void recoverToTarget(ActionListener<RecoveryResponse> listener) {
220226
}
221227

222228
deleteRetentionLeaseStep.whenComplete(ignored -> {
229+
Transports.assertNotTransportThread(RecoverySourceHandler.this + "[phase1]");
223230
phase1(safeCommitRef.getIndexCommit(), shard.getLastKnownGlobalCheckpoint(), () -> estimateNumOps, sendFileStep);
224231
}, onFailure);
225232

@@ -233,30 +240,33 @@ public void recoverToTarget(ActionListener<RecoveryResponse> listener) {
233240
if (shard.indexSettings().isSoftDeleteEnabled()
234241
&& shard.indexSettings().getIndexMetaData().getState() != IndexMetaData.State.CLOSE) {
235242
runUnderPrimaryPermit(() -> {
236-
try {
237-
// conservative estimate of the GCP for creating the lease. TODO use the actual GCP once it's appropriate
238-
final long globalCheckpoint = startingSeqNo - 1;
239-
// blindly create the lease. TODO integrate this with the recovery process
240-
shard.addPeerRecoveryRetentionLease(
241-
request.targetNode().getId(), globalCheckpoint, establishRetentionLeaseStep);
242-
} catch (RetentionLeaseAlreadyExistsException e) {
243-
logger.debug("peer-recovery retention lease already exists", e);
244-
establishRetentionLeaseStep.onResponse(null);
245-
}
246-
}, shardId + " establishing retention lease for [" + request.targetAllocationId() + "]",
243+
try {
244+
// conservative estimate of the GCP for creating the lease. TODO use the actual GCP once it's appropriate
245+
final long globalCheckpoint = startingSeqNo - 1;
246+
// blindly create the lease. TODO integrate this with the recovery process
247+
shard.addPeerRecoveryRetentionLease(request.targetNode().getId(), globalCheckpoint,
248+
new ThreadedActionListener<>(logger, shard.getThreadPool(),
249+
ThreadPool.Names.GENERIC, establishRetentionLeaseStep, false));
250+
} catch (RetentionLeaseAlreadyExistsException e) {
251+
logger.debug("peer-recovery retention lease already exists", e);
252+
establishRetentionLeaseStep.onResponse(null);
253+
}
254+
}, shardId + " establishing retention lease for [" + request.targetAllocationId() + "]",
247255
shard, cancellableThreads, logger);
248256
} else {
249257
establishRetentionLeaseStep.onResponse(null);
250258
}
251259
}, onFailure);
252260

253261
establishRetentionLeaseStep.whenComplete(r -> {
262+
Transports.assertNotTransportThread(RecoverySourceHandler.this + "[prepareTargetForTranslog]");
254263
// For a sequence based recovery, the target can keep its local translog
255264
prepareTargetForTranslog(isSequenceNumberBasedRecovery == false,
256265
shard.estimateNumberOfHistoryOperations("peer-recovery", startingSeqNo), prepareEngineStep);
257266
}, onFailure);
258267

259268
prepareEngineStep.whenComplete(prepareEngineTime -> {
269+
Transports.assertNotTransportThread(RecoverySourceHandler.this + "[phase2]");
260270
/*
261271
* add shard to replication group (shard will receive replication requests from this point on) now that engine is open.
262272
* This means that any document indexed into the primary after this will be replicated to this replica as well

0 commit comments

Comments
 (0)