-
Notifications
You must be signed in to change notification settings - Fork 25.2k
Send file chunks asynchronously in peer recovery #39769
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
0492097
f45f369
e9e3584
fd4fb60
1dc4823
24519f1
037cdc6
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -33,18 +33,20 @@ | |
import org.elasticsearch.Version; | ||
import org.elasticsearch.action.ActionListener; | ||
import org.elasticsearch.action.StepListener; | ||
import org.elasticsearch.action.support.PlainActionFuture; | ||
import org.elasticsearch.cluster.routing.IndexShardRoutingTable; | ||
import org.elasticsearch.cluster.routing.ShardRouting; | ||
import org.elasticsearch.common.CheckedSupplier; | ||
import org.elasticsearch.common.StopWatch; | ||
import org.elasticsearch.common.bytes.BytesArray; | ||
import org.elasticsearch.common.collect.Tuple; | ||
import org.elasticsearch.common.bytes.BytesReference; | ||
import org.elasticsearch.common.lease.Releasable; | ||
import org.elasticsearch.common.logging.Loggers; | ||
import org.elasticsearch.common.lucene.store.InputStreamIndexInput; | ||
import org.elasticsearch.common.unit.ByteSizeValue; | ||
import org.elasticsearch.common.unit.TimeValue; | ||
import org.elasticsearch.common.util.CancellableThreads; | ||
import org.elasticsearch.common.util.concurrent.ConcurrentCollections; | ||
import org.elasticsearch.common.util.concurrent.FutureUtils; | ||
import org.elasticsearch.core.internal.io.IOUtils; | ||
import org.elasticsearch.index.engine.Engine; | ||
|
@@ -63,17 +65,20 @@ | |
import org.elasticsearch.transport.RemoteTransportException; | ||
|
||
import java.io.Closeable; | ||
import java.io.EOFException; | ||
import java.io.IOException; | ||
import java.io.InputStream; | ||
import java.util.ArrayList; | ||
import java.util.Arrays; | ||
import java.util.Collections; | ||
import java.util.Comparator; | ||
import java.util.Deque; | ||
import java.util.List; | ||
import java.util.Locale; | ||
import java.util.concurrent.CompletableFuture; | ||
import java.util.concurrent.CopyOnWriteArrayList; | ||
import java.util.concurrent.Semaphore; | ||
import java.util.concurrent.atomic.AtomicInteger; | ||
import java.util.concurrent.atomic.AtomicReference; | ||
import java.util.function.Consumer; | ||
import java.util.function.Supplier; | ||
import java.util.stream.StreamSupport; | ||
|
@@ -102,6 +107,7 @@ public class RecoverySourceHandler { | |
private final int chunkSizeInBytes; | ||
private final RecoveryTargetHandler recoveryTarget; | ||
private final int maxConcurrentFileChunks; | ||
private final List<Closeable> resources = new CopyOnWriteArrayList<>(); | ||
private final CancellableThreads cancellableThreads = new CancellableThreads(); | ||
|
||
public RecoverySourceHandler(final IndexShard shard, RecoveryTargetHandler recoveryTarget, final StartRecoveryRequest request, | ||
|
@@ -124,7 +130,6 @@ public StartRecoveryRequest getRequest() { | |
* performs the recovery from the local engine to the target | ||
*/ | ||
public void recoverToTarget(ActionListener<RecoveryResponse> listener) { | ||
final List<Closeable> resources = new CopyOnWriteArrayList<>(); | ||
final Closeable releaseResources = () -> IOUtils.close(resources); | ||
final ActionListener<RecoveryResponse> wrappedListener = ActionListener.notifyOnce(listener); | ||
try { | ||
|
@@ -411,7 +416,11 @@ public SendFileResult phase1(final IndexCommit snapshot, final Supplier<Integer> | |
phase1ExistingFileNames.size(), new ByteSizeValue(existingTotalSize)); | ||
cancellableThreads.execute(() -> recoveryTarget.receiveFileInfo( | ||
phase1FileNames, phase1FileSizes, phase1ExistingFileNames, phase1ExistingFileSizes, translogOps.get())); | ||
sendFiles(store, phase1Files.toArray(new StoreFileMetaData[0]), translogOps); | ||
final PlainActionFuture<Void> sendFileChunksFuture = new PlainActionFuture<>(); | ||
final StoreFileMetaData[] filesSortedByLength = phase1Files.toArray(new StoreFileMetaData[0]); | ||
ArrayUtil.timSort(filesSortedByLength, Comparator.comparingLong(StoreFileMetaData::length)); // send smallest first | ||
sendFiles(store, filesSortedByLength, translogOps, sendFileChunksFuture); | ||
cancellableThreads.execute(sendFileChunksFuture::actionGet); | ||
// Send the CLEAN_FILES request, which takes all of the files that | ||
// were transferred and renames them from their temporary file | ||
// names to the actual file names. It also writes checksums for | ||
|
@@ -680,72 +689,182 @@ public String toString() { | |
'}'; | ||
} | ||
|
||
void sendFiles(Store store, StoreFileMetaData[] files, Supplier<Integer> translogOps) throws Exception { | ||
ArrayUtil.timSort(files, Comparator.comparingLong(StoreFileMetaData::length)); // send smallest first | ||
final LocalCheckpointTracker requestSeqIdTracker = new LocalCheckpointTracker(NO_OPS_PERFORMED, NO_OPS_PERFORMED); | ||
final AtomicReference<Tuple<StoreFileMetaData, Exception>> error = new AtomicReference<>(); | ||
final byte[] buffer = new byte[chunkSizeInBytes]; | ||
for (final StoreFileMetaData md : files) { | ||
if (error.get() != null) { | ||
break; | ||
} | ||
try (IndexInput indexInput = store.directory().openInput(md.name(), IOContext.READONCE); | ||
InputStream in = new InputStreamIndexInput(indexInput, md.length())) { | ||
long position = 0; | ||
int bytesRead; | ||
while ((bytesRead = in.read(buffer, 0, buffer.length)) != -1) { | ||
final BytesArray content = new BytesArray(buffer, 0, bytesRead); | ||
final boolean lastChunk = position + content.length() == md.length(); | ||
final long requestSeqId = requestSeqIdTracker.generateSeqNo(); | ||
cancellableThreads.execute(() -> requestSeqIdTracker.waitForOpsToComplete(requestSeqId - maxConcurrentFileChunks)); | ||
cancellableThreads.checkForCancel(); | ||
if (error.get() != null) { | ||
void sendFiles(Store store, StoreFileMetaData[] files, Supplier<Integer> translogOps, ActionListener<Void> listener) { | ||
final MultiFileSender multiFileSender = new MultiFileSender(store, files, translogOps); | ||
resources.add(multiFileSender); // need to register to the resource list so we can clean up if the recovery gets cancelled. | ||
final ActionListener<Void> wrappedListener = ActionListener.wrap( | ||
r -> { | ||
multiFileSender.close(); | ||
listener.onResponse(null); | ||
}, | ||
e -> { | ||
IOUtils.closeWhileHandlingException(multiFileSender); | ||
listener.onFailure(e); | ||
}); | ||
multiFileSender.sendFileChunks(ActionListener.notifyOnce(wrappedListener)); | ||
} | ||
|
||
private final class MultiFileSender implements Closeable { | ||
private final Store store; | ||
private final List<StoreFileMetaData> files; | ||
private final Supplier<Integer> translogOps; | ||
private final LocalCheckpointTracker requestSeqIdTracker = new LocalCheckpointTracker(NO_OPS_PERFORMED, NO_OPS_PERFORMED); | ||
private final Deque<byte[]> recycledBuffers; | ||
private boolean closed; // ensure we don't reopen files if the recovery is cancelled | ||
private StoreFileMetaData md; | ||
private InputStream currentInput; | ||
private int position; | ||
private final Semaphore semaphore = new Semaphore(1); | ||
|
||
MultiFileSender(Store store, StoreFileMetaData[] files, Supplier<Integer> translogOps) { | ||
this.store = store; | ||
this.files = new ArrayList<>(Arrays.asList(files)); | ||
this.translogOps = translogOps; | ||
this.recycledBuffers = ConcurrentCollections.newDeque(); | ||
} | ||
|
||
/** | ||
* File chunks are read sequentially by at most one thread. Other threads which are triggered by file-chunk responses | ||
* will abort without waiting if another thread is reading files already. This is controlled via {@code semaphore}. | ||
* | ||
* This implementation can send up to {@code maxConcurrentFileChunks} consecutive file-chunk requests without waiting | ||
* for the replies from the recovery target to reduce the recovery time in secure/compressed/high latency communication. | ||
* We assign a seqId to every file-chunk request and stop reading/sending file chunks if the gap between max_seq_no | ||
* and local_checkpoint is greater than {@code maxConcurrentFileChunks}. | ||
*/ | ||
void sendFileChunks(ActionListener<Void> listener) { | ||
assert ThreadPool.assertCurrentMethodIsNotCalledRecursively(); | ||
while (true) { | ||
cancellableThreads.checkForCancel(); | ||
synchronized (this) { | ||
if (semaphore.tryAcquire() == false) { | ||
break; | ||
} | ||
final long requestFilePosition = position; | ||
cancellableThreads.executeIO(() -> | ||
recoveryTarget.writeFileChunk(md, requestFilePosition, content, lastChunk, translogOps.get(), | ||
// don't send more, the number of unreplied chunks is already greater than maxConcurrentFileChunks, | ||
if (requestSeqIdTracker.getMaxSeqNo() - requestSeqIdTracker.getCheckpoint() >= maxConcurrentFileChunks) { | ||
semaphore.release(); | ||
break; | ||
} | ||
} | ||
try { | ||
final byte[] reusedBuffer = recycledBuffers.pollFirst(); | ||
final byte[] buffer = reusedBuffer != null ? reusedBuffer : new byte[chunkSizeInBytes]; | ||
final FileChunk chunk = readChunk(buffer); | ||
semaphore.release(); // other thread can read and send chunks | ||
if (chunk == null) { | ||
if (requestSeqIdTracker.getMaxSeqNo() == requestSeqIdTracker.getCheckpoint()) { | ||
dnhatn marked this conversation as resolved.
Show resolved
Hide resolved
|
||
listener.onResponse(null); | ||
} | ||
break; | ||
} | ||
cancellableThreads.execute(() -> | ||
recoveryTarget.writeFileChunk(chunk.md, chunk.position, chunk.content, chunk.lastChunk, translogOps.get(), | ||
ActionListener.wrap( | ||
r -> requestSeqIdTracker.markSeqNoAsCompleted(requestSeqId), | ||
e -> { | ||
error.compareAndSet(null, Tuple.tuple(md, e)); | ||
requestSeqIdTracker.markSeqNoAsCompleted(requestSeqId); | ||
} | ||
r -> { | ||
recycledBuffers.addFirst(buffer); | ||
requestSeqIdTracker.markSeqNoAsCompleted(chunk.seqId); | ||
sendFileChunks(listener); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. can we make sure we always fork here somehow? I am a bit worried that we are ending up with a stack overflow? Like we can assert that we don't have There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I opened #39988 for this. |
||
}, | ||
e -> listener.onFailure(handleErrorOnSendFiles(chunk.md, e)) | ||
))); | ||
position += content.length(); | ||
} catch (Exception e) { | ||
listener.onFailure(e); | ||
return; | ||
} | ||
} | ||
} | ||
|
||
FileChunk readChunk(final byte[] buffer) throws Exception { | ||
try { | ||
synchronized (this) { | ||
if (closed) { | ||
throw new IllegalStateException("chunk reader was closed"); | ||
} | ||
if (currentInput == null) { | ||
if (files.isEmpty()) { | ||
return null; | ||
} | ||
md = files.remove(0); | ||
position = 0; | ||
final IndexInput indexInput = store.directory().openInput(md.name(), IOContext.READONCE); | ||
currentInput = new InputStreamIndexInput(indexInput, md.length()) { | ||
@Override | ||
public void close() throws IOException { | ||
indexInput.close(); //InputStreamIndexInput's close is noop | ||
} | ||
}; | ||
} | ||
} | ||
final int bytesRead = currentInput.read(buffer); | ||
if (bytesRead == -1) { | ||
throw new EOFException("position [" + position + "] md [" + md + "]"); | ||
} | ||
final boolean lastChunk; | ||
final long chunkPosition; | ||
synchronized (this) { | ||
chunkPosition = this.position; | ||
this.position += bytesRead; | ||
lastChunk = this.position == md.length(); | ||
if (lastChunk) { | ||
IOUtils.close(currentInput, () -> currentInput = null); | ||
} | ||
} | ||
final long seqId = requestSeqIdTracker.generateSeqNo(); | ||
return new FileChunk(seqId, md, new BytesArray(buffer, 0, bytesRead), chunkPosition, lastChunk); | ||
} catch (Exception e) { | ||
error.compareAndSet(null, Tuple.tuple(md, e)); | ||
break; | ||
throw handleErrorOnSendFiles(md, e); | ||
} | ||
} | ||
// When we terminate exceptionally, we don't wait for the outstanding requests as we don't use their results anyway. | ||
// This allows us to end quickly and eliminate the complexity of handling requestSeqIds in case of error. | ||
if (error.get() == null) { | ||
cancellableThreads.execute(() -> requestSeqIdTracker.waitForOpsToComplete(requestSeqIdTracker.getMaxSeqNo())); | ||
|
||
@Override | ||
public synchronized void close() throws IOException { | ||
dnhatn marked this conversation as resolved.
Show resolved
Hide resolved
|
||
if (closed == false) { | ||
closed = true; | ||
IOUtils.close(currentInput); | ||
} | ||
} | ||
if (error.get() != null) { | ||
handleErrorOnSendFiles(store, error.get().v1(), error.get().v2()); | ||
|
||
Exception handleErrorOnSendFiles(StoreFileMetaData md, Exception e) { | ||
try { | ||
final IOException corruptIndexException; | ||
if (md != null && (corruptIndexException = ExceptionsHelper.unwrapCorruption(e)) != null) { | ||
if (store.checkIntegrityNoException(md) == false) { // we are corrupted on the primary -- fail! | ||
logger.warn("{} Corrupted file detected {} checksum mismatch", shardId, md); | ||
try { | ||
failEngine(corruptIndexException); | ||
} catch (Exception inner) { | ||
corruptIndexException.addSuppressed(inner); | ||
} | ||
return corruptIndexException; | ||
} else { // corruption has happened on the way to replica | ||
final RemoteTransportException remoteTransportException = | ||
new RemoteTransportException("File corruption occurred on recovery but checksums are ok", null); | ||
remoteTransportException.addSuppressed(e); | ||
logger.warn(() -> new ParameterizedMessage("{} Remote file corruption on node {}, recovering {}. local checksum OK", | ||
shardId, request.targetNode(), md), corruptIndexException); | ||
return remoteTransportException; | ||
} | ||
} | ||
} catch (Exception inner) { | ||
e.addSuppressed(inner); | ||
} | ||
return e; | ||
} | ||
} | ||
|
||
private void handleErrorOnSendFiles(Store store, StoreFileMetaData md, Exception e) throws Exception { | ||
final IOException corruptIndexException; | ||
if ((corruptIndexException = ExceptionsHelper.unwrapCorruption(e)) != null) { | ||
if (store.checkIntegrityNoException(md) == false) { // we are corrupted on the primary -- fail! | ||
logger.warn("{} Corrupted file detected {} checksum mismatch", shardId, md); | ||
failEngine(corruptIndexException); | ||
throw corruptIndexException; | ||
} else { // corruption has happened on the way to replica | ||
RemoteTransportException exception = new RemoteTransportException( | ||
"File corruption occurred on recovery but checksums are ok", null); | ||
exception.addSuppressed(e); | ||
logger.warn(() -> new ParameterizedMessage("{} Remote file corruption on node {}, recovering {}. local checksum OK", | ||
shardId, request.targetNode(), md), corruptIndexException); | ||
throw exception; | ||
} | ||
} else { | ||
throw e; | ||
private static final class FileChunk { | ||
final long seqId; | ||
final StoreFileMetaData md; | ||
final BytesReference content; | ||
final long position; | ||
final boolean lastChunk; | ||
|
||
FileChunk(long seqId, StoreFileMetaData md, BytesReference content, long position, boolean lastChunk) { | ||
this.seqId = seqId; | ||
this.md = md; | ||
this.content = content; | ||
this.position = position; | ||
this.lastChunk = lastChunk; | ||
} | ||
} | ||
|
||
|
Uh oh!
There was an error while loading. Please reload this page.