Skip to content

Commit dfacbf8

Browse files
committed
Smarter CCR concurrent file chunk fetching (#38841)
The previous logic for concurrent file chunk fetching did not allow for multiple chunks from the same file to be fetched in parallel. The parallelism only allowed to fetch chunks from different files in parallel. This required complex logic on the follower to be aware from which file it was already fetching information, in order to ensure that chunks for the same file would be fetched in sequential order. During benchmarking, this exhibited throughput issues when recovery came towards the end, where it would only be sequentially fetching chunks for the same largest segment file, with throughput considerably going down in a high-latency network as there was no parallelism anymore. The new logic here follows the peer recovery model more closely, and sends multiple requests for the same file in parallel, and then reorders the results as necessary. Benchmarks show that this leads to better overall throughput and the implementation is also simpler.
1 parent 10765bd commit dfacbf8

File tree

2 files changed

+24
-95
lines changed

2 files changed

+24
-95
lines changed

x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/repository/CcrRepository.java

Lines changed: 23 additions & 90 deletions
Original file line numberDiff line numberDiff line change
@@ -70,10 +70,8 @@
7070
import java.io.Closeable;
7171
import java.io.IOException;
7272
import java.io.InputStream;
73-
import java.util.ArrayDeque;
7473
import java.util.ArrayList;
7574
import java.util.Collections;
76-
import java.util.Comparator;
7775
import java.util.HashMap;
7876
import java.util.List;
7977
import java.util.Map;
@@ -381,16 +379,6 @@ void restoreFiles() throws IOException {
381379
restore(snapshotFiles);
382380
}
383381

384-
private static class FileSession {
385-
FileSession(long lastTrackedSeqNo, long lastOffset) {
386-
this.lastTrackedSeqNo = lastTrackedSeqNo;
387-
this.lastOffset = lastOffset;
388-
}
389-
390-
final long lastTrackedSeqNo;
391-
final long lastOffset;
392-
}
393-
394382
@Override
395383
protected void restoreFiles(List<FileInfo> filesToRecover, Store store) throws IOException {
396384
logger.trace("[{}] starting CCR restore of {} files", shardId, filesToRecover);
@@ -399,116 +387,61 @@ protected void restoreFiles(List<FileInfo> filesToRecover, Store store) throws I
399387
final LocalCheckpointTracker requestSeqIdTracker = new LocalCheckpointTracker(NO_OPS_PERFORMED, NO_OPS_PERFORMED);
400388
final AtomicReference<Tuple<StoreFileMetaData, Exception>> error = new AtomicReference<>();
401389

402-
final ArrayDeque<FileInfo> remainingFiles = new ArrayDeque<>(filesToRecover);
403-
final Map<FileInfo, FileSession> inFlightRequests = new HashMap<>();
404-
final Object mutex = new Object();
405-
406-
while (true) {
407-
if (error.get() != null) {
408-
break;
409-
}
410-
final FileInfo fileToRecover;
411-
final FileSession prevFileSession;
412-
synchronized (mutex) {
413-
if (inFlightRequests.isEmpty() && remainingFiles.isEmpty()) {
414-
break;
415-
}
416-
final long maxConcurrentFileChunks = ccrSettings.getMaxConcurrentFileChunks();
417-
if (remainingFiles.isEmpty() == false && inFlightRequests.size() < maxConcurrentFileChunks) {
418-
for (int i = 0; i < maxConcurrentFileChunks; i++) {
419-
if (remainingFiles.isEmpty()) {
420-
break;
421-
}
422-
inFlightRequests.put(remainingFiles.pop(), new FileSession(NO_OPS_PERFORMED, 0));
423-
}
424-
}
425-
final Map.Entry<FileInfo, FileSession> minEntry =
426-
inFlightRequests.entrySet().stream().min(Comparator.comparingLong(e -> e.getValue().lastTrackedSeqNo)).get();
427-
prevFileSession = minEntry.getValue();
428-
fileToRecover = minEntry.getKey();
429-
}
430-
try {
431-
requestSeqIdTracker.waitForOpsToComplete(prevFileSession.lastTrackedSeqNo);
432-
final FileSession fileSession;
433-
synchronized (mutex) {
434-
fileSession = inFlightRequests.get(fileToRecover);
435-
// if file has been removed in the mean-while, it means that restore of this file completed, so start working
436-
// on the next one
437-
if (fileSession == null) {
438-
continue;
439-
}
440-
}
390+
for (FileInfo fileInfo : filesToRecover) {
391+
final long fileLength = fileInfo.length();
392+
long offset = 0;
393+
while (offset < fileLength && error.get() == null) {
441394
final long requestSeqId = requestSeqIdTracker.generateSeqNo();
442395
try {
443-
synchronized (mutex) {
444-
inFlightRequests.put(fileToRecover, new FileSession(requestSeqId, fileSession.lastOffset));
396+
requestSeqIdTracker.waitForOpsToComplete(requestSeqId - ccrSettings.getMaxConcurrentFileChunks());
397+
398+
if (error.get() != null) {
399+
requestSeqIdTracker.markSeqNoAsCompleted(requestSeqId);
400+
break;
445401
}
446-
final int bytesRequested = Math.toIntExact(Math.min(ccrSettings.getChunkSize().getBytes(),
447-
fileToRecover.length() - fileSession.lastOffset));
402+
403+
final int bytesRequested = Math.toIntExact(
404+
Math.min(ccrSettings.getChunkSize().getBytes(), fileLength - offset));
405+
offset += bytesRequested;
406+
448407
final GetCcrRestoreFileChunkRequest request =
449-
new GetCcrRestoreFileChunkRequest(node, sessionUUID, fileToRecover.name(), bytesRequested);
408+
new GetCcrRestoreFileChunkRequest(node, sessionUUID, fileInfo.name(), bytesRequested);
450409
logger.trace("[{}] [{}] fetching chunk for file [{}], expected offset: {}, size: {}", shardId, snapshotId,
451-
fileToRecover.name(), fileSession.lastOffset, bytesRequested);
410+
fileInfo.name(), offset, bytesRequested);
452411

453412
remoteClient.execute(GetCcrRestoreFileChunkAction.INSTANCE, request,
454413
ActionListener.wrap(
455414
r -> threadPool.generic().execute(new AbstractRunnable() {
456415
@Override
457416
public void onFailure(Exception e) {
458-
error.compareAndSet(null, Tuple.tuple(fileToRecover.metadata(), e));
417+
error.compareAndSet(null, Tuple.tuple(fileInfo.metadata(), e));
459418
requestSeqIdTracker.markSeqNoAsCompleted(requestSeqId);
460419
}
461420

462421
@Override
463422
protected void doRun() throws Exception {
464423
final int actualChunkSize = r.getChunk().length();
465424
logger.trace("[{}] [{}] got response for file [{}], offset: {}, length: {}", shardId,
466-
snapshotId, fileToRecover.name(), r.getOffset(), actualChunkSize);
425+
snapshotId, fileInfo.name(), r.getOffset(), actualChunkSize);
467426
final long nanosPaused = ccrSettings.getRateLimiter().maybePause(actualChunkSize);
468427
throttleListener.accept(nanosPaused);
469-
final long newOffset = r.getOffset() + actualChunkSize;
470-
471-
assert r.getOffset() == fileSession.lastOffset;
472-
assert actualChunkSize == bytesRequested;
473-
assert newOffset <= fileToRecover.length();
474-
final boolean lastChunk = newOffset >= fileToRecover.length();
475-
multiFileWriter.writeFileChunk(fileToRecover.metadata(), r.getOffset(), r.getChunk(),
476-
lastChunk);
477-
if (lastChunk) {
478-
synchronized (mutex) {
479-
final FileSession removed = inFlightRequests.remove(fileToRecover);
480-
assert removed != null : "session disappeared for " + fileToRecover.name();
481-
assert removed.lastTrackedSeqNo == requestSeqId;
482-
assert removed.lastOffset == fileSession.lastOffset;
483-
}
484-
} else {
485-
synchronized (mutex) {
486-
final FileSession replaced = inFlightRequests.replace(fileToRecover,
487-
new FileSession(requestSeqId, newOffset));
488-
assert replaced != null : "session disappeared for " + fileToRecover.name();
489-
assert replaced.lastTrackedSeqNo == requestSeqId;
490-
assert replaced.lastOffset == fileSession.lastOffset;
491-
}
492-
}
428+
final boolean lastChunk = r.getOffset() + actualChunkSize >= fileLength;
429+
multiFileWriter.writeFileChunk(fileInfo.metadata(), r.getOffset(), r.getChunk(), lastChunk);
493430
requestSeqIdTracker.markSeqNoAsCompleted(requestSeqId);
494431
}
495432
}),
496433
e -> {
497-
error.compareAndSet(null, Tuple.tuple(fileToRecover.metadata(), e));
434+
error.compareAndSet(null, Tuple.tuple(fileInfo.metadata(), e));
498435
requestSeqIdTracker.markSeqNoAsCompleted(requestSeqId);
499436
}
500437
));
501438
} catch (Exception e) {
502-
error.compareAndSet(null, Tuple.tuple(fileToRecover.metadata(), e));
439+
error.compareAndSet(null, Tuple.tuple(fileInfo.metadata(), e));
503440
requestSeqIdTracker.markSeqNoAsCompleted(requestSeqId);
504-
throw e;
505441
}
506-
} catch (Exception e) {
507-
error.compareAndSet(null, Tuple.tuple(fileToRecover.metadata(), e));
508-
break;
509442
}
510-
511443
}
444+
512445
try {
513446
requestSeqIdTracker.waitForOpsToComplete(requestSeqIdTracker.getMaxSeqNo());
514447
} catch (InterruptedException e) {

x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/repository/CcrRestoreSourceService.java

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -208,11 +208,7 @@ private Store.MetadataSnapshot getMetaData() throws IOException {
208208
}
209209

210210
private long readFileBytes(String fileName, BytesReference reference) throws IOException {
211-
Releasable lock = keyedLock.tryAcquire(fileName);
212-
if (lock == null) {
213-
throw new IllegalStateException("can't read from the same file on the same session concurrently");
214-
}
215-
try (Releasable releasable = lock) {
211+
try (Releasable ignored = keyedLock.acquire(fileName)) {
216212
final IndexInput indexInput = cachedInputs.computeIfAbsent(fileName, f -> {
217213
try {
218214
return commitRef.getIndexCommit().getDirectory().openInput(fileName, IOContext.READONCE);

0 commit comments

Comments
 (0)