Skip to content

Commit 7394892

Browse files
authored
Make prepare engine step of recovery source non-blocking (#37573)
Relates #37174
1 parent ca4b586 commit 7394892

File tree

8 files changed

+83
-72
lines changed

8 files changed

+83
-72
lines changed

server/src/main/java/org/elasticsearch/indices/recovery/PeerRecoveryTargetService.java

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -431,13 +431,13 @@ public interface RecoveryListener {
431431
class PrepareForTranslogOperationsRequestHandler implements TransportRequestHandler<RecoveryPrepareForTranslogOperationsRequest> {
432432

433433
@Override
434-
public void messageReceived(RecoveryPrepareForTranslogOperationsRequest request, TransportChannel channel,
435-
Task task) throws Exception {
436-
try (RecoveryRef recoveryRef = onGoingRecoveries.getRecoverySafe(request.recoveryId(), request.shardId()
437-
)) {
438-
recoveryRef.target().prepareForTranslogOperations(request.isFileBasedRecovery(), request.totalTranslogOps());
434+
public void messageReceived(RecoveryPrepareForTranslogOperationsRequest request, TransportChannel channel, Task task) {
435+
try (RecoveryRef recoveryRef = onGoingRecoveries.getRecoverySafe(request.recoveryId(), request.shardId())) {
436+
final ActionListener<TransportResponse> listener =
437+
new HandledTransportAction.ChannelActionListener<>(channel, Actions.PREPARE_TRANSLOG, request);
438+
recoveryRef.target().prepareForTranslogOperations(request.isFileBasedRecovery(), request.totalTranslogOps(),
439+
ActionListener.wrap(nullVal -> listener.onResponse(TransportResponse.Empty.INSTANCE), listener::onFailure));
439440
}
440-
channel.sendResponse(TransportResponse.Empty.INSTANCE);
441441
}
442442
}
443443

server/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java

Lines changed: 56 additions & 51 deletions
Original file line numberDiff line numberDiff line change
@@ -197,51 +197,51 @@ public void recoverToTarget(ActionListener<RecoveryResponse> listener) {
197197
assert requiredSeqNoRangeStart >= startingSeqNo : "requiredSeqNoRangeStart [" + requiredSeqNoRangeStart + "] is lower than ["
198198
+ startingSeqNo + "]";
199199

200-
final TimeValue prepareEngineTime;
201-
try {
202-
// For a sequence based recovery, the target can keep its local translog
203-
prepareEngineTime = prepareTargetForTranslog(isSequenceNumberBasedRecovery == false,
204-
shard.estimateNumberOfHistoryOperations("peer-recovery", startingSeqNo));
205-
} catch (final Exception e) {
206-
throw new RecoveryEngineException(shard.shardId(), 1, "prepare target for translog failed", e);
207-
}
200+
final StepListener<TimeValue> prepareEngineStep = new StepListener<>();
201+
// For a sequence based recovery, the target can keep its local translog
202+
prepareTargetForTranslog(isSequenceNumberBasedRecovery == false,
203+
shard.estimateNumberOfHistoryOperations("peer-recovery", startingSeqNo), prepareEngineStep);
204+
final StepListener<SendSnapshotResult> sendSnapshotStep = new StepListener<>();
205+
prepareEngineStep.whenComplete(prepareEngineTime -> {
206+
/*
207+
* add shard to replication group (shard will receive replication requests from this point on) now that engine is open.
208+
* This means that any document indexed into the primary after this will be replicated to this replica as well
209+
* make sure to do this before sampling the max sequence number in the next step, to ensure that we send
210+
* all documents up to maxSeqNo in phase2.
211+
*/
212+
runUnderPrimaryPermit(() -> shard.initiateTracking(request.targetAllocationId()),
213+
shardId + " initiating tracking of " + request.targetAllocationId(), shard, cancellableThreads, logger);
208214

209-
/*
210-
* add shard to replication group (shard will receive replication requests from this point on) now that engine is open.
211-
* This means that any document indexed into the primary after this will be replicated to this replica as well
212-
* make sure to do this before sampling the max sequence number in the next step, to ensure that we send
213-
* all documents up to maxSeqNo in phase2.
214-
*/
215-
runUnderPrimaryPermit(() -> shard.initiateTracking(request.targetAllocationId()),
216-
shardId + " initiating tracking of " + request.targetAllocationId(), shard, cancellableThreads, logger);
217-
218-
final long endingSeqNo = shard.seqNoStats().getMaxSeqNo();
219-
/*
220-
* We need to wait for all operations up to the current max to complete, otherwise we can not guarantee that all
221-
* operations in the required range will be available for replaying from the translog of the source.
222-
*/
223-
cancellableThreads.execute(() -> shard.waitForOpsToComplete(endingSeqNo));
224-
225-
if (logger.isTraceEnabled()) {
226-
logger.trace("all operations up to [{}] completed, which will be used as an ending sequence number", endingSeqNo);
227-
logger.trace("snapshot translog for recovery; current size is [{}]",
228-
shard.estimateNumberOfHistoryOperations("peer-recovery", startingSeqNo));
229-
}
215+
final long endingSeqNo = shard.seqNoStats().getMaxSeqNo();
216+
/*
217+
* We need to wait for all operations up to the current max to complete, otherwise we can not guarantee that all
218+
* operations in the required range will be available for replaying from the translog of the source.
219+
*/
220+
cancellableThreads.execute(() -> shard.waitForOpsToComplete(endingSeqNo));
221+
if (logger.isTraceEnabled()) {
222+
logger.trace("all operations up to [{}] completed, which will be used as an ending sequence number", endingSeqNo);
223+
logger.trace("snapshot translog for recovery; current size is [{}]",
224+
shard.estimateNumberOfHistoryOperations("peer-recovery", startingSeqNo));
225+
}
226+
final Translog.Snapshot phase2Snapshot = shard.getHistoryOperations("peer-recovery", startingSeqNo);
227+
resources.add(phase2Snapshot);
228+
// we can release the retention lock here because the snapshot itself will retain the required operations.
229+
retentionLock.close();
230+
// we have to capture the max_seen_auto_id_timestamp and the max_seq_no_of_updates to make sure that these values
231+
// are at least as high as the corresponding values on the primary when any of these operations were executed on it.
232+
final long maxSeenAutoIdTimestamp = shard.getMaxSeenAutoIdTimestamp();
233+
final long maxSeqNoOfUpdatesOrDeletes = shard.getMaxSeqNoOfUpdatesOrDeletes();
234+
phase2(startingSeqNo, requiredSeqNoRangeStart, endingSeqNo, phase2Snapshot, maxSeenAutoIdTimestamp,
235+
maxSeqNoOfUpdatesOrDeletes, sendSnapshotStep);
236+
sendSnapshotStep.whenComplete(
237+
r -> IOUtils.close(phase2Snapshot),
238+
e -> {
239+
IOUtils.closeWhileHandlingException(phase2Snapshot);
240+
onFailure.accept(new RecoveryEngineException(shard.shardId(), 2, "phase2 failed", e));
241+
});
242+
243+
}, onFailure);
230244

231-
final Translog.Snapshot phase2Snapshot = shard.getHistoryOperations("peer-recovery", startingSeqNo);
232-
resources.add(phase2Snapshot);
233-
// we can release the retention lock here because the snapshot itself will retain the required operations.
234-
IOUtils.close(retentionLock);
235-
// we have to capture the max_seen_auto_id_timestamp and the max_seq_no_of_updates to make sure that these values
236-
// are at least as high as the corresponding values on the primary when any of these operations were executed on it.
237-
final long maxSeenAutoIdTimestamp = shard.getMaxSeenAutoIdTimestamp();
238-
final long maxSeqNoOfUpdatesOrDeletes = shard.getMaxSeqNoOfUpdatesOrDeletes();
239-
final StepListener<SendSnapshotResult> sendSnapshotStep = new StepListener<>();
240-
phase2(startingSeqNo, requiredSeqNoRangeStart, endingSeqNo, phase2Snapshot, maxSeenAutoIdTimestamp,
241-
maxSeqNoOfUpdatesOrDeletes, sendSnapshotStep);
242-
sendSnapshotStep.whenComplete(
243-
r -> IOUtils.close(phase2Snapshot),
244-
e -> onFailure.accept(new RecoveryEngineException(shard.shardId(), 2, "phase2 failed", e)));
245245
final StepListener<Void> finalizeStep = new StepListener<>();
246246
sendSnapshotStep.whenComplete(r -> finalizeRecovery(r.targetLocalCheckpoint, finalizeStep), onFailure);
247247

@@ -251,7 +251,7 @@ public void recoverToTarget(ActionListener<RecoveryResponse> listener) {
251251
final RecoveryResponse response = new RecoveryResponse(sendFileResult.phase1FileNames, sendFileResult.phase1FileSizes,
252252
sendFileResult.phase1ExistingFileNames, sendFileResult.phase1ExistingFileSizes, sendFileResult.totalSize,
253253
sendFileResult.existingTotalSize, sendFileResult.took.millis(), phase1ThrottlingWaitTime,
254-
prepareEngineTime.millis(), sendSnapshotResult.totalOperations, sendSnapshotResult.tookTime.millis());
254+
prepareEngineStep.result().millis(), sendSnapshotResult.totalOperations, sendSnapshotResult.tookTime.millis());
255255
try {
256256
wrappedListener.onResponse(response);
257257
} finally {
@@ -484,16 +484,21 @@ public SendFileResult phase1(final IndexCommit snapshot, final Supplier<Integer>
484484
}
485485
}
486486

487-
TimeValue prepareTargetForTranslog(final boolean fileBasedRecovery, final int totalTranslogOps) throws IOException {
487+
void prepareTargetForTranslog(boolean fileBasedRecovery, int totalTranslogOps, ActionListener<TimeValue> listener) {
488488
StopWatch stopWatch = new StopWatch().start();
489-
logger.trace("recovery [phase1]: prepare remote engine for translog");
489+
final ActionListener<Void> wrappedListener = ActionListener.wrap(
490+
nullVal -> {
491+
stopWatch.stop();
492+
final TimeValue tookTime = stopWatch.totalTime();
493+
logger.trace("recovery [phase1]: remote engine start took [{}]", tookTime);
494+
listener.onResponse(tookTime);
495+
},
496+
e -> listener.onFailure(new RecoveryEngineException(shard.shardId(), 1, "prepare target for translog failed", e)));
490497
// Send a request preparing the new shard's translog to receive operations. This ensures the shard engine is started and disables
491498
// garbage collection (not the JVM's GC!) of tombstone deletes.
492-
cancellableThreads.executeIO(() -> recoveryTarget.prepareForTranslogOperations(fileBasedRecovery, totalTranslogOps));
493-
stopWatch.stop();
494-
final TimeValue tookTime = stopWatch.totalTime();
495-
logger.trace("recovery [phase1]: remote engine start took [{}]", tookTime);
496-
return tookTime;
499+
logger.trace("recovery [phase1]: prepare remote engine for translog");
500+
cancellableThreads.execute(() ->
501+
recoveryTarget.prepareForTranslogOperations(fileBasedRecovery, totalTranslogOps, wrappedListener));
497502
}
498503

499504
/**

server/src/main/java/org/elasticsearch/indices/recovery/RecoveryTarget.java

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -366,9 +366,12 @@ private void ensureRefCount() {
366366
/*** Implementation of {@link RecoveryTargetHandler } */
367367

368368
@Override
369-
public void prepareForTranslogOperations(boolean fileBasedRecovery, int totalTranslogOps) throws IOException {
370-
state().getTranslog().totalOperations(totalTranslogOps);
371-
indexShard().openEngineAndSkipTranslogRecovery();
369+
public void prepareForTranslogOperations(boolean fileBasedRecovery, int totalTranslogOps, ActionListener<Void> listener) {
370+
ActionListener.completeWith(listener, () -> {
371+
state().getTranslog().totalOperations(totalTranslogOps);
372+
indexShard().openEngineAndSkipTranslogRecovery();
373+
return null;
374+
});
372375
}
373376

374377
@Override

server/src/main/java/org/elasticsearch/indices/recovery/RecoveryTargetHandler.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ public interface RecoveryTargetHandler {
3535
* @param fileBasedRecovery whether or not this call is part of an file based recovery
3636
* @param totalTranslogOps total translog operations expected to be sent
3737
*/
38-
void prepareForTranslogOperations(boolean fileBasedRecovery, int totalTranslogOps) throws IOException;
38+
void prepareForTranslogOperations(boolean fileBasedRecovery, int totalTranslogOps, ActionListener<Void> listener);
3939

4040
/**
4141
* The finalize request refreshes the engine now that new segments are available, enables garbage collection of tombstone files, and

server/src/main/java/org/elasticsearch/indices/recovery/RemoteRecoveryTargetHandler.java

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -77,11 +77,12 @@ public RemoteRecoveryTargetHandler(long recoveryId, ShardId shardId, TransportSe
7777
}
7878

7979
@Override
80-
public void prepareForTranslogOperations(boolean fileBasedRecovery, int totalTranslogOps) throws IOException {
80+
public void prepareForTranslogOperations(boolean fileBasedRecovery, int totalTranslogOps, ActionListener<Void> listener) {
8181
transportService.submitRequest(targetNode, PeerRecoveryTargetService.Actions.PREPARE_TRANSLOG,
82-
new RecoveryPrepareForTranslogOperationsRequest(recoveryId, shardId, totalTranslogOps, fileBasedRecovery),
83-
TransportRequestOptions.builder().withTimeout(recoverySettings.internalActionTimeout()).build(),
84-
EmptyTransportResponseHandler.INSTANCE_SAME).txGet();
82+
new RecoveryPrepareForTranslogOperationsRequest(recoveryId, shardId, totalTranslogOps, fileBasedRecovery),
83+
TransportRequestOptions.builder().withTimeout(recoverySettings.internalActionTimeout()).build(),
84+
new ActionListenerResponseHandler<>(ActionListener.wrap(r -> listener.onResponse(null), listener::onFailure),
85+
in -> TransportResponse.Empty.INSTANCE, ThreadPool.Names.GENERIC));
8586
}
8687

8788
@Override

server/src/test/java/org/elasticsearch/index/replication/IndexLevelReplicationTests.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import org.apache.lucene.search.TermQuery;
2626
import org.apache.lucene.search.TopDocs;
2727
import org.elasticsearch.Version;
28+
import org.elasticsearch.action.ActionListener;
2829
import org.elasticsearch.action.DocWriteResponse;
2930
import org.elasticsearch.action.bulk.BulkItemResponse;
3031
import org.elasticsearch.action.bulk.BulkShardRequest;
@@ -198,13 +199,14 @@ public IndexResult index(Index op) throws IOException {
198199
Future<Void> fut = shards.asyncRecoverReplica(replica,
199200
(shard, node) -> new RecoveryTarget(shard, node, recoveryListener, v -> {}){
200201
@Override
201-
public void prepareForTranslogOperations(boolean fileBasedRecovery, int totalTranslogOps) throws IOException {
202+
public void prepareForTranslogOperations(boolean fileBasedRecovery, int totalTranslogOps,
203+
ActionListener<Void> listener) {
202204
try {
203205
indexedOnPrimary.await();
204206
} catch (InterruptedException e) {
205207
throw new AssertionError(e);
206208
}
207-
super.prepareForTranslogOperations(fileBasedRecovery, totalTranslogOps);
209+
super.prepareForTranslogOperations(fileBasedRecovery, totalTranslogOps, listener);
208210
}
209211
});
210212
fut.get();

server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2573,8 +2573,8 @@ public void testRefreshListenersDuringPeerRecovery() throws IOException {
25732573
}) {
25742574
// we're only checking that listeners are called when the engine is open, before there is no point
25752575
@Override
2576-
public void prepareForTranslogOperations(boolean fileBasedRecovery, int totalTranslogOps) throws IOException {
2577-
super.prepareForTranslogOperations(fileBasedRecovery, totalTranslogOps);
2576+
public void prepareForTranslogOperations(boolean fileBasedRecovery, int totalTranslogOps, ActionListener<Void> listener) {
2577+
super.prepareForTranslogOperations(fileBasedRecovery, totalTranslogOps, listener);
25782578
assertListenerCalled.accept(replica);
25792579
}
25802580

server/src/test/java/org/elasticsearch/indices/recovery/RecoverySourceHandlerTests.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -491,9 +491,9 @@ public SendFileResult phase1(final IndexCommit snapshot, final Supplier<Integer>
491491
}
492492

493493
@Override
494-
TimeValue prepareTargetForTranslog(final boolean fileBasedRecovery, final int totalTranslogOps) throws IOException {
494+
void prepareTargetForTranslog(boolean fileBasedRecovery, int totalTranslogOps, ActionListener<TimeValue> listener) {
495495
prepareTargetForTranslogCalled.set(true);
496-
return super.prepareTargetForTranslog(fileBasedRecovery, totalTranslogOps);
496+
super.prepareTargetForTranslog(fileBasedRecovery, totalTranslogOps, listener);
497497
}
498498

499499
@Override
@@ -700,7 +700,7 @@ private List<StoreFileMetaData> generateFiles(Store store, int numFiles, IntSupp
700700

701701
class TestRecoveryTargetHandler implements RecoveryTargetHandler {
702702
@Override
703-
public void prepareForTranslogOperations(boolean fileBasedRecovery, int totalTranslogOps) {
703+
public void prepareForTranslogOperations(boolean fileBasedRecovery, int totalTranslogOps, ActionListener<Void> listener) {
704704
}
705705

706706
@Override

0 commit comments

Comments
 (0)