Skip to content

Commit b1f829f

Browse files
committed
Smarter CCR concurrent file chunk fetching (#38841)
The previous logic for concurrent file chunk fetching did not allow for multiple chunks from the same file to be fetched in parallel. The parallelism only allowed to fetch chunks from different files in parallel. This required complex logic on the follower to be aware from which file it was already fetching information, in order to ensure that chunks for the same file would be fetched in sequential order. During benchmarking, this exhibited throughput issues when recovery came towards the end, where it would only be sequentially fetching chunks for the same largest segment file, with throughput considerably going down in a high-latency network as there was no parallelism anymore. The new logic here follows the peer recovery model more closely, and sends multiple requests for the same file in parallel, and then reorders the results as necessary. Benchmarks show that this leads to better overall throughput and the implementation is also simpler.
1 parent c36d192 commit b1f829f

File tree

2 files changed

+24
-95
lines changed

2 files changed

+24
-95
lines changed

x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/repository/CcrRepository.java

+23-90
Original file line numberDiff line numberDiff line change
@@ -70,10 +70,8 @@
7070
import java.io.Closeable;
7171
import java.io.IOException;
7272
import java.io.InputStream;
73-
import java.util.ArrayDeque;
7473
import java.util.ArrayList;
7574
import java.util.Collections;
76-
import java.util.Comparator;
7775
import java.util.HashMap;
7876
import java.util.List;
7977
import java.util.Map;
@@ -375,16 +373,6 @@ void restoreFiles() throws IOException {
375373
restore(snapshotFiles);
376374
}
377375

378-
private static class FileSession {
379-
FileSession(long lastTrackedSeqNo, long lastOffset) {
380-
this.lastTrackedSeqNo = lastTrackedSeqNo;
381-
this.lastOffset = lastOffset;
382-
}
383-
384-
final long lastTrackedSeqNo;
385-
final long lastOffset;
386-
}
387-
388376
@Override
389377
protected void restoreFiles(List<FileInfo> filesToRecover, Store store) throws IOException {
390378
logger.trace("[{}] starting CCR restore of {} files", shardId, filesToRecover);
@@ -393,116 +381,61 @@ protected void restoreFiles(List<FileInfo> filesToRecover, Store store) throws I
393381
final LocalCheckpointTracker requestSeqIdTracker = new LocalCheckpointTracker(NO_OPS_PERFORMED, NO_OPS_PERFORMED);
394382
final AtomicReference<Tuple<StoreFileMetaData, Exception>> error = new AtomicReference<>();
395383

396-
final ArrayDeque<FileInfo> remainingFiles = new ArrayDeque<>(filesToRecover);
397-
final Map<FileInfo, FileSession> inFlightRequests = new HashMap<>();
398-
final Object mutex = new Object();
399-
400-
while (true) {
401-
if (error.get() != null) {
402-
break;
403-
}
404-
final FileInfo fileToRecover;
405-
final FileSession prevFileSession;
406-
synchronized (mutex) {
407-
if (inFlightRequests.isEmpty() && remainingFiles.isEmpty()) {
408-
break;
409-
}
410-
final long maxConcurrentFileChunks = ccrSettings.getMaxConcurrentFileChunks();
411-
if (remainingFiles.isEmpty() == false && inFlightRequests.size() < maxConcurrentFileChunks) {
412-
for (int i = 0; i < maxConcurrentFileChunks; i++) {
413-
if (remainingFiles.isEmpty()) {
414-
break;
415-
}
416-
inFlightRequests.put(remainingFiles.pop(), new FileSession(NO_OPS_PERFORMED, 0));
417-
}
418-
}
419-
final Map.Entry<FileInfo, FileSession> minEntry =
420-
inFlightRequests.entrySet().stream().min(Comparator.comparingLong(e -> e.getValue().lastTrackedSeqNo)).get();
421-
prevFileSession = minEntry.getValue();
422-
fileToRecover = minEntry.getKey();
423-
}
424-
try {
425-
requestSeqIdTracker.waitForOpsToComplete(prevFileSession.lastTrackedSeqNo);
426-
final FileSession fileSession;
427-
synchronized (mutex) {
428-
fileSession = inFlightRequests.get(fileToRecover);
429-
// if file has been removed in the mean-while, it means that restore of this file completed, so start working
430-
// on the next one
431-
if (fileSession == null) {
432-
continue;
433-
}
434-
}
384+
for (FileInfo fileInfo : filesToRecover) {
385+
final long fileLength = fileInfo.length();
386+
long offset = 0;
387+
while (offset < fileLength && error.get() == null) {
435388
final long requestSeqId = requestSeqIdTracker.generateSeqNo();
436389
try {
437-
synchronized (mutex) {
438-
inFlightRequests.put(fileToRecover, new FileSession(requestSeqId, fileSession.lastOffset));
390+
requestSeqIdTracker.waitForOpsToComplete(requestSeqId - ccrSettings.getMaxConcurrentFileChunks());
391+
392+
if (error.get() != null) {
393+
requestSeqIdTracker.markSeqNoAsCompleted(requestSeqId);
394+
break;
439395
}
440-
final int bytesRequested = Math.toIntExact(Math.min(ccrSettings.getChunkSize().getBytes(),
441-
fileToRecover.length() - fileSession.lastOffset));
396+
397+
final int bytesRequested = Math.toIntExact(
398+
Math.min(ccrSettings.getChunkSize().getBytes(), fileLength - offset));
399+
offset += bytesRequested;
400+
442401
final GetCcrRestoreFileChunkRequest request =
443-
new GetCcrRestoreFileChunkRequest(node, sessionUUID, fileToRecover.name(), bytesRequested);
402+
new GetCcrRestoreFileChunkRequest(node, sessionUUID, fileInfo.name(), bytesRequested);
444403
logger.trace("[{}] [{}] fetching chunk for file [{}], expected offset: {}, size: {}", shardId, snapshotId,
445-
fileToRecover.name(), fileSession.lastOffset, bytesRequested);
404+
fileInfo.name(), offset, bytesRequested);
446405

447406
remoteClient.execute(GetCcrRestoreFileChunkAction.INSTANCE, request,
448407
ActionListener.wrap(
449408
r -> threadPool.generic().execute(new AbstractRunnable() {
450409
@Override
451410
public void onFailure(Exception e) {
452-
error.compareAndSet(null, Tuple.tuple(fileToRecover.metadata(), e));
411+
error.compareAndSet(null, Tuple.tuple(fileInfo.metadata(), e));
453412
requestSeqIdTracker.markSeqNoAsCompleted(requestSeqId);
454413
}
455414

456415
@Override
457416
protected void doRun() throws Exception {
458417
final int actualChunkSize = r.getChunk().length();
459418
logger.trace("[{}] [{}] got response for file [{}], offset: {}, length: {}", shardId,
460-
snapshotId, fileToRecover.name(), r.getOffset(), actualChunkSize);
419+
snapshotId, fileInfo.name(), r.getOffset(), actualChunkSize);
461420
final long nanosPaused = ccrSettings.getRateLimiter().maybePause(actualChunkSize);
462421
throttleListener.accept(nanosPaused);
463-
final long newOffset = r.getOffset() + actualChunkSize;
464-
465-
assert r.getOffset() == fileSession.lastOffset;
466-
assert actualChunkSize == bytesRequested;
467-
assert newOffset <= fileToRecover.length();
468-
final boolean lastChunk = newOffset >= fileToRecover.length();
469-
multiFileWriter.writeFileChunk(fileToRecover.metadata(), r.getOffset(), r.getChunk(),
470-
lastChunk);
471-
if (lastChunk) {
472-
synchronized (mutex) {
473-
final FileSession removed = inFlightRequests.remove(fileToRecover);
474-
assert removed != null : "session disappeared for " + fileToRecover.name();
475-
assert removed.lastTrackedSeqNo == requestSeqId;
476-
assert removed.lastOffset == fileSession.lastOffset;
477-
}
478-
} else {
479-
synchronized (mutex) {
480-
final FileSession replaced = inFlightRequests.replace(fileToRecover,
481-
new FileSession(requestSeqId, newOffset));
482-
assert replaced != null : "session disappeared for " + fileToRecover.name();
483-
assert replaced.lastTrackedSeqNo == requestSeqId;
484-
assert replaced.lastOffset == fileSession.lastOffset;
485-
}
486-
}
422+
final boolean lastChunk = r.getOffset() + actualChunkSize >= fileLength;
423+
multiFileWriter.writeFileChunk(fileInfo.metadata(), r.getOffset(), r.getChunk(), lastChunk);
487424
requestSeqIdTracker.markSeqNoAsCompleted(requestSeqId);
488425
}
489426
}),
490427
e -> {
491-
error.compareAndSet(null, Tuple.tuple(fileToRecover.metadata(), e));
428+
error.compareAndSet(null, Tuple.tuple(fileInfo.metadata(), e));
492429
requestSeqIdTracker.markSeqNoAsCompleted(requestSeqId);
493430
}
494431
));
495432
} catch (Exception e) {
496-
error.compareAndSet(null, Tuple.tuple(fileToRecover.metadata(), e));
433+
error.compareAndSet(null, Tuple.tuple(fileInfo.metadata(), e));
497434
requestSeqIdTracker.markSeqNoAsCompleted(requestSeqId);
498-
throw e;
499435
}
500-
} catch (Exception e) {
501-
error.compareAndSet(null, Tuple.tuple(fileToRecover.metadata(), e));
502-
break;
503436
}
504-
505437
}
438+
506439
try {
507440
requestSeqIdTracker.waitForOpsToComplete(requestSeqIdTracker.getMaxSeqNo());
508441
} catch (InterruptedException e) {

x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/repository/CcrRestoreSourceService.java

+1-5
Original file line numberDiff line numberDiff line change
@@ -208,11 +208,7 @@ private Store.MetadataSnapshot getMetaData() throws IOException {
208208
}
209209

210210
private long readFileBytes(String fileName, BytesReference reference) throws IOException {
211-
Releasable lock = keyedLock.tryAcquire(fileName);
212-
if (lock == null) {
213-
throw new IllegalStateException("can't read from the same file on the same session concurrently");
214-
}
215-
try (Releasable releasable = lock) {
211+
try (Releasable ignored = keyedLock.acquire(fileName)) {
216212
final IndexInput indexInput = cachedInputs.computeIfAbsent(fileName, f -> {
217213
try {
218214
return commitRef.getIndexCommit().getDirectory().openInput(fileName, IOContext.READONCE);

0 commit comments

Comments
 (0)