Skip to content

Commit cbbd230

Browse files
authored
Fix CCR concurrent file chunk fetching bug (elastic#38736)
Fixes a bug with concurrent file chunk fetching during recovery from remote where the wrong offset was used.
1 parent ed73bb7 commit cbbd230

File tree

2 files changed

+25
-7
lines changed

2 files changed

+25
-7
lines changed

x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/repository/CcrRepository.java

Lines changed: 19 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -402,7 +402,7 @@ protected void restoreFiles(List<FileInfo> filesToRecover, Store store) throws I
402402
break;
403403
}
404404
final FileInfo fileToRecover;
405-
final FileSession fileSession;
405+
final FileSession prevFileSession;
406406
synchronized (mutex) {
407407
if (inFlightRequests.isEmpty() && remainingFiles.isEmpty()) {
408408
break;
@@ -418,15 +418,17 @@ protected void restoreFiles(List<FileInfo> filesToRecover, Store store) throws I
418418
}
419419
final Map.Entry<FileInfo, FileSession> minEntry =
420420
inFlightRequests.entrySet().stream().min(Comparator.comparingLong(e -> e.getValue().lastTrackedSeqNo)).get();
421-
fileSession = minEntry.getValue();
421+
prevFileSession = minEntry.getValue();
422422
fileToRecover = minEntry.getKey();
423423
}
424424
try {
425-
requestSeqIdTracker.waitForOpsToComplete(fileSession.lastTrackedSeqNo);
425+
requestSeqIdTracker.waitForOpsToComplete(prevFileSession.lastTrackedSeqNo);
426+
final FileSession fileSession;
426427
synchronized (mutex) {
428+
fileSession = inFlightRequests.get(fileToRecover);
427429
// if file has been removed in the mean-while, it means that restore of this file completed, so start working
428430
// on the next one
429-
if (inFlightRequests.containsKey(fileToRecover) == false) {
431+
if (fileSession == null) {
430432
continue;
431433
}
432434
}
@@ -439,7 +441,8 @@ protected void restoreFiles(List<FileInfo> filesToRecover, Store store) throws I
439441
fileToRecover.length() - fileSession.lastOffset));
440442
final GetCcrRestoreFileChunkRequest request =
441443
new GetCcrRestoreFileChunkRequest(node, sessionUUID, fileToRecover.name(), bytesRequested);
442-
logger.trace("[{}] [{}] fetching chunk for file [{}]", shardId, snapshotId, fileToRecover.name());
444+
logger.trace("[{}] [{}] fetching chunk for file [{}], expected offset: {}, size: {}", shardId, snapshotId,
445+
fileToRecover.name(), fileSession.lastOffset, bytesRequested);
443446

444447
remoteClient.execute(GetCcrRestoreFileChunkAction.INSTANCE, request,
445448
ActionListener.wrap(
@@ -453,23 +456,32 @@ public void onFailure(Exception e) {
453456
@Override
454457
protected void doRun() throws Exception {
455458
final int actualChunkSize = r.getChunk().length();
459+
logger.trace("[{}] [{}] got response for file [{}], offset: {}, length: {}", shardId,
460+
snapshotId, fileToRecover.name(), r.getOffset(), actualChunkSize);
456461
final long nanosPaused = ccrSettings.getRateLimiter().maybePause(actualChunkSize);
457462
throttleListener.accept(nanosPaused);
458463
final long newOffset = r.getOffset() + actualChunkSize;
464+
465+
assert r.getOffset() == fileSession.lastOffset;
466+
assert actualChunkSize == bytesRequested;
459467
assert newOffset <= fileToRecover.length();
460468
final boolean lastChunk = newOffset >= fileToRecover.length();
461469
multiFileWriter.writeFileChunk(fileToRecover.metadata(), r.getOffset(), r.getChunk(),
462470
lastChunk);
463471
if (lastChunk) {
464472
synchronized (mutex) {
465-
final FileSession session = inFlightRequests.remove(fileToRecover);
466-
assert session != null : "session disappeared for " + fileToRecover.name();
473+
final FileSession removed = inFlightRequests.remove(fileToRecover);
474+
assert removed != null : "session disappeared for " + fileToRecover.name();
475+
assert removed.lastTrackedSeqNo == requestSeqId;
476+
assert removed.lastOffset == fileSession.lastOffset;
467477
}
468478
} else {
469479
synchronized (mutex) {
470480
final FileSession replaced = inFlightRequests.replace(fileToRecover,
471481
new FileSession(requestSeqId, newOffset));
472482
assert replaced != null : "session disappeared for " + fileToRecover.name();
483+
assert replaced.lastTrackedSeqNo == requestSeqId;
484+
assert replaced.lastOffset == fileSession.lastOffset;
473485
}
474486
}
475487
requestSeqIdTracker.markSeqNoAsCompleted(requestSeqId);

x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/IndexFollowingIT.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -106,6 +106,12 @@ public class IndexFollowingIT extends CcrIntegTestCase {
106106
public void testFollowIndex() throws Exception {
107107
final int numberOfPrimaryShards = randomIntBetween(1, 3);
108108
int numberOfReplicas = between(0, 1);
109+
110+
followerClient().admin().cluster().prepareUpdateSettings()
111+
.setTransientSettings(Settings.builder().put(CcrSettings.RECOVERY_CHUNK_SIZE.getKey(),
112+
new ByteSizeValue(randomIntBetween(1, 1000), ByteSizeUnit.KB)))
113+
.get();
114+
109115
final String leaderIndexSettings = getIndexSettings(numberOfPrimaryShards, numberOfReplicas,
110116
singletonMap(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), "true"));
111117
assertAcked(leaderClient().admin().indices().prepareCreate("index1").setSource(leaderIndexSettings, XContentType.JSON));

0 commit comments

Comments
 (0)