Skip to content

Commit 598e00a

Browse files
committed
Make peer recovery send file info step async (#43792)
Relates #36195
1 parent 9aa6f7c commit 598e00a

File tree

8 files changed

+49
-34
lines changed

8 files changed

+49
-34
lines changed

server/src/main/java/org/elasticsearch/indices/recovery/PeerRecoveryTargetService.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -531,11 +531,11 @@ class FilesInfoRequestHandler implements TransportRequestHandler<RecoveryFilesIn
531531

532532
@Override
533533
public void messageReceived(RecoveryFilesInfoRequest request, TransportChannel channel, Task task) throws Exception {
534-
try (RecoveryRef recoveryRef = onGoingRecoveries.getRecoverySafe(request.recoveryId(), request.shardId()
535-
)) {
536-
recoveryRef.target().receiveFileInfo(request.phase1FileNames, request.phase1FileSizes, request.phase1ExistingFileNames,
537-
request.phase1ExistingFileSizes, request.totalTranslogOps);
538-
channel.sendResponse(TransportResponse.Empty.INSTANCE);
534+
try (RecoveryRef recoveryRef = onGoingRecoveries.getRecoverySafe(request.recoveryId(), request.shardId())) {
535+
final ActionListener<TransportResponse> listener = new ChannelActionListener<>(channel, Actions.FILES_INFO, request);
536+
recoveryRef.target().receiveFileInfo(
537+
request.phase1FileNames, request.phase1FileSizes, request.phase1ExistingFileNames, request.phase1ExistingFileSizes,
538+
request.totalTranslogOps, ActionListener.map(listener, nullVal -> TransportResponse.Empty.INSTANCE));
539539
}
540540
}
541541
}

server/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java

Lines changed: 15 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -406,17 +406,25 @@ void phase1(IndexCommit snapshot, long globalCheckpoint, IntSupplier translogOps
406406
logger.trace("recovery [phase1]: recovering_files [{}] with total_size [{}], reusing_files [{}] with total_size [{}]",
407407
phase1FileNames.size(), new ByteSizeValue(totalSizeInBytes),
408408
phase1ExistingFileNames.size(), new ByteSizeValue(existingTotalSizeInBytes));
409-
cancellableThreads.execute(() -> recoveryTarget.receiveFileInfo(
410-
phase1FileNames, phase1FileSizes, phase1ExistingFileNames, phase1ExistingFileSizes, translogOps.getAsInt()));
411-
sendFiles(store, phase1Files.toArray(new StoreFileMetaData[0]), translogOps);
409+
final StepListener<Void> sendFileInfoStep = new StepListener<>();
410+
final StepListener<Void> cleanFilesStep = new StepListener<>();
411+
cancellableThreads.execute(() ->
412+
recoveryTarget.receiveFileInfo(phase1FileNames, phase1FileSizes, phase1ExistingFileNames,
413+
phase1ExistingFileSizes, translogOps.getAsInt(), sendFileInfoStep));
414+
415+
sendFileInfoStep.whenComplete(r -> {
416+
sendFiles(store, phase1Files.toArray(new StoreFileMetaData[0]), translogOps);
417+
cleanFiles(store, recoverySourceMetadata, translogOps, globalCheckpoint, cleanFilesStep);
418+
}, listener::onFailure);
419+
412420
final long totalSize = totalSizeInBytes;
413421
final long existingTotalSize = existingTotalSizeInBytes;
414-
cleanFiles(store, recoverySourceMetadata, translogOps, globalCheckpoint, ActionListener.map(listener, aVoid -> {
422+
cleanFilesStep.whenComplete(r -> {
415423
final TimeValue took = stopWatch.totalTime();
416424
logger.trace("recovery [phase1]: took [{}]", took);
417-
return new SendFileResult(phase1FileNames, phase1FileSizes, totalSize, phase1ExistingFileNames,
418-
phase1ExistingFileSizes, existingTotalSize, took);
419-
}));
425+
listener.onResponse(new SendFileResult(phase1FileNames, phase1FileSizes, totalSize, phase1ExistingFileNames,
426+
phase1ExistingFileSizes, existingTotalSize, took));
427+
}, listener::onFailure);
420428
} else {
421429
logger.trace("skipping [phase1]- identical sync id [{}] found on both source and target",
422430
recoverySourceMetadata.getSyncId());

server/src/main/java/org/elasticsearch/indices/recovery/RecoveryTarget.java

Lines changed: 14 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -378,17 +378,20 @@ public void receiveFileInfo(List<String> phase1FileNames,
378378
List<Long> phase1FileSizes,
379379
List<String> phase1ExistingFileNames,
380380
List<Long> phase1ExistingFileSizes,
381-
int totalTranslogOps) {
382-
final RecoveryState.Index index = state().getIndex();
383-
for (int i = 0; i < phase1ExistingFileNames.size(); i++) {
384-
index.addFileDetail(phase1ExistingFileNames.get(i), phase1ExistingFileSizes.get(i), true);
385-
}
386-
for (int i = 0; i < phase1FileNames.size(); i++) {
387-
index.addFileDetail(phase1FileNames.get(i), phase1FileSizes.get(i), false);
388-
}
389-
state().getTranslog().totalOperations(totalTranslogOps);
390-
state().getTranslog().totalOperationsOnStart(totalTranslogOps);
391-
381+
int totalTranslogOps,
382+
ActionListener<Void> listener) {
383+
ActionListener.completeWith(listener, () -> {
384+
final RecoveryState.Index index = state().getIndex();
385+
for (int i = 0; i < phase1ExistingFileNames.size(); i++) {
386+
index.addFileDetail(phase1ExistingFileNames.get(i), phase1ExistingFileSizes.get(i), true);
387+
}
388+
for (int i = 0; i < phase1FileNames.size(); i++) {
389+
index.addFileDetail(phase1FileNames.get(i), phase1FileSizes.get(i), false);
390+
}
391+
state().getTranslog().totalOperations(totalTranslogOps);
392+
state().getTranslog().totalOperationsOnStart(totalTranslogOps);
393+
return null;
394+
});
392395
}
393396

394397
@Override

server/src/main/java/org/elasticsearch/indices/recovery/RecoveryTargetHandler.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -88,7 +88,8 @@ void receiveFileInfo(List<String> phase1FileNames,
8888
List<Long> phase1FileSizes,
8989
List<String> phase1ExistingFileNames,
9090
List<Long> phase1ExistingFileSizes,
91-
int totalTranslogOps);
91+
int totalTranslogOps,
92+
ActionListener<Void> listener);
9293

9394
/**
9495
* After all source files has been sent over, this command is sent to the target so it can clean any local

server/src/main/java/org/elasticsearch/indices/recovery/RemoteRecoveryTargetHandler.java

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -129,14 +129,13 @@ public void indexTranslogOperations(
129129

130130
@Override
131131
public void receiveFileInfo(List<String> phase1FileNames, List<Long> phase1FileSizes, List<String> phase1ExistingFileNames,
132-
List<Long> phase1ExistingFileSizes, int totalTranslogOps) {
133-
132+
List<Long> phase1ExistingFileSizes, int totalTranslogOps, ActionListener<Void> listener) {
134133
RecoveryFilesInfoRequest recoveryInfoFilesRequest = new RecoveryFilesInfoRequest(recoveryId, shardId,
135-
phase1FileNames, phase1FileSizes, phase1ExistingFileNames, phase1ExistingFileSizes, totalTranslogOps);
134+
phase1FileNames, phase1FileSizes, phase1ExistingFileNames, phase1ExistingFileSizes, totalTranslogOps);
136135
transportService.submitRequest(targetNode, PeerRecoveryTargetService.Actions.FILES_INFO, recoveryInfoFilesRequest,
137-
TransportRequestOptions.builder().withTimeout(recoverySettings.internalActionTimeout()).build(),
138-
EmptyTransportResponseHandler.INSTANCE_SAME).txGet();
139-
136+
TransportRequestOptions.builder().withTimeout(recoverySettings.internalActionTimeout()).build(),
137+
new ActionListenerResponseHandler<>(ActionListener.map(listener, r -> null),
138+
in -> TransportResponse.Empty.INSTANCE, ThreadPool.Names.GENERIC));
140139
}
141140

142141
@Override

server/src/test/java/org/elasticsearch/indices/recovery/PeerRecoveryTargetServiceTests.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -145,11 +145,13 @@ public void testWriteFileChunksConcurrently() throws Exception {
145145
final DiscoveryNode rNode = getFakeDiscoNode(targetShard.routingEntry().currentNodeId());
146146
targetShard.markAsRecovering("test-peer-recovery", new RecoveryState(targetShard.routingEntry(), rNode, pNode));
147147
final RecoveryTarget recoveryTarget = new RecoveryTarget(targetShard, null, null);
148+
final PlainActionFuture<Void> receiveFileInfoFuture = new PlainActionFuture<>();
148149
recoveryTarget.receiveFileInfo(
149150
mdFiles.stream().map(StoreFileMetaData::name).collect(Collectors.toList()),
150151
mdFiles.stream().map(StoreFileMetaData::length).collect(Collectors.toList()),
151-
Collections.emptyList(), Collections.emptyList(), 0
152+
Collections.emptyList(), Collections.emptyList(), 0, receiveFileInfoFuture
152153
);
154+
receiveFileInfoFuture.actionGet();
153155
List<RecoveryFileChunkRequest> requests = new ArrayList<>();
154156
for (StoreFileMetaData md : mdFiles) {
155157
try (IndexInput in = sourceShard.store().directory().openInput(md.name(), IOContext.READONCE)) {

server/src/test/java/org/elasticsearch/indices/recovery/RecoverySourceHandlerTests.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -753,7 +753,8 @@ public void indexTranslogOperations(
753753

754754
@Override
755755
public void receiveFileInfo(List<String> phase1FileNames, List<Long> phase1FileSizes, List<String> phase1ExistingFileNames,
756-
List<Long> phase1ExistingFileSizes, int totalTranslogOps) {
756+
List<Long> phase1ExistingFileSizes, int totalTranslogOps, ActionListener<Void> listener) {
757+
757758
}
758759

759760
@Override

test/framework/src/main/java/org/elasticsearch/indices/recovery/AsyncRecoveryTarget.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -69,8 +69,9 @@ public void indexTranslogOperations(List<Translog.Operation> operations, int tot
6969

7070
@Override
7171
public void receiveFileInfo(List<String> phase1FileNames, List<Long> phase1FileSizes, List<String> phase1ExistingFileNames,
72-
List<Long> phase1ExistingFileSizes, int totalTranslogOps) {
73-
target.receiveFileInfo(phase1FileNames, phase1FileSizes, phase1ExistingFileNames, phase1ExistingFileSizes, totalTranslogOps);
72+
List<Long> phase1ExistingFileSizes, int totalTranslogOps, ActionListener<Void> listener) {
73+
executor.execute(() -> target.receiveFileInfo(
74+
phase1FileNames, phase1FileSizes, phase1ExistingFileNames, phase1ExistingFileSizes, totalTranslogOps, listener));
7475
}
7576

7677
@Override

0 commit comments

Comments
 (0)