Skip to content

Make peer recovery send file chunks async #44040

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 31 commits into from
Jul 16, 2019
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
31 commits
Select commit Hold shift + click to select a range
25d0156
Peer recovery sends file chunks non-blocking
dnhatn Jun 24, 2019
782920c
read ahead
dnhatn Jul 6, 2019
eb4b097
Revert "read ahead"
dnhatn Jul 6, 2019
8d64abe
style check
dnhatn Jul 6, 2019
5c1977a
merge MultiFileReader
dnhatn Jul 8, 2019
93d45c4
inline
dnhatn Jul 8, 2019
472bd70
report error in the loop
dnhatn Jul 8, 2019
97b2099
henning’s comment
dnhatn Jul 9, 2019
ec2194f
move checking error down
dnhatn Jul 9, 2019
e8dfd75
integrate with AsyncIOProcessor
dnhatn Jul 9, 2019
649aad2
Merge branch 'master' into send-chunks
dnhatn Jul 9, 2019
6065672
use single buffer
dnhatn Jul 10, 2019
77d00ef
prepare for ccr
dnhatn Jul 10, 2019
3a398b4
naming
dnhatn Jul 10, 2019
32dccb3
style check
dnhatn Jul 11, 2019
2d9aae8
integrate with ccr
dnhatn Jul 11, 2019
d49922f
Revert "integrate with ccr"
dnhatn Jul 11, 2019
a41592c
Henning’s comments
dnhatn Jul 11, 2019
6e3546d
Merge branch 'master' into send-chunks
dnhatn Jul 11, 2019
28e2664
unused settings
dnhatn Jul 11, 2019
28cface
wording
dnhatn Jul 11, 2019
caab1de
Merge branch 'master' into send-chunks
dnhatn Jul 12, 2019
78a8a4e
remove not used resp now
dnhatn Jul 12, 2019
6f8f7a4
add status enum
dnhatn Jul 12, 2019
c2b3fdf
read ahead
dnhatn Jul 12, 2019
dc60dd8
Merge branch 'master' into send-chunks
dnhatn Jul 12, 2019
d582b09
add status to assertion
dnhatn Jul 12, 2019
fb8bb6e
notify once
dnhatn Jul 13, 2019
8cf85d5
missing word in comment
dnhatn Jul 13, 2019
6d76894
let bubble up exception
dnhatn Jul 15, 2019
21c2a0a
super close
dnhatn Jul 15, 2019
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import org.elasticsearch.common.StopWatch;
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.lease.Releasable;
import org.elasticsearch.common.lease.Releasables;
import org.elasticsearch.common.logging.Loggers;
Expand Down Expand Up @@ -81,6 +82,7 @@
import java.util.concurrent.Executor;
import java.util.concurrent.Semaphore;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import java.util.function.IntSupplier;
import java.util.stream.StreamSupport;
Expand Down Expand Up @@ -704,12 +706,12 @@ private class MultiFileSender extends ActionRunnable<Void> implements Closeable
private final IntSupplier translogOps;
private final LocalCheckpointTracker requestSeqIdTracker = new LocalCheckpointTracker(NO_OPS_PERFORMED, NO_OPS_PERFORMED);
private final Semaphore semaphore = new Semaphore(0);
private boolean closed = false;
private final Iterator<StoreFileMetaData> remainingFiles;
private StoreFileMetaData currentFile;
private InputStreamIndexInput currentInput = null;
private long currentChunkPosition = 0;
private final Deque<byte[]> recycledBuffers = ConcurrentCollections.newDeque();
private final AtomicReference<Tuple<StoreFileMetaData, Exception>> error = new AtomicReference<>();

MultiFileSender(Store store, IntSupplier translogOps, StoreFileMetaData[] files, ActionListener<Void> listener) {
super(ActionListener.notifyOnce(listener));
Expand All @@ -724,6 +726,10 @@ protected void doRun() throws Exception {
assert Transports.assertNotTransportThread(RecoverySourceHandler.this + "[send file chunk]");
while (true) {
assert semaphore.availablePermits() == 0;
if (error.get() != null) {
handleErrorOnSendFiles(store, error.get().v2(), new StoreFileMetaData[]{error.get().v1()});
throw error.get().v2();
}
final FileChunk chunk = readNextChunk();
if (chunk == null) {
semaphore.release(); // allow other threads respond if we are not done yet.
Expand All @@ -743,25 +749,20 @@ protected void doRun() throws Exception {
sendFileExecutor.execute(this); // fork off from the network thread
}
},
// need to fork as `handleErrorOnSendFiles` might read some files which should not happen on the network thread
e -> sendFileExecutor.execute(new ActionRunnable<>(this.listener) {
@Override
protected void doRun() throws Exception {
cancellableThreads.execute(semaphore::acquire);
try (Releasable ignored = semaphore::release) {
handleErrorOnSendFiles(store, e, new StoreFileMetaData[]{chunk.md});
throw e;
}
e -> {
if (error.compareAndSet(null, Tuple.tuple(chunk.md, e)) && semaphore.tryAcquire()) {
// have to fork as handleErrorOnSendFiles can read file which should not happen on the network thread.
sendFileExecutor.execute(this);
}
})
)
)
);
if (canSendMore() == false) {
semaphore.release();
// Here we have to retry before abort to avoid a race situation where the other threads have flipped `canSendMore`
// condition but they are not going to resume the sending process because this thread still holds the semaphore.
if (canSendMore() == false || semaphore.tryAcquire() == false) {
final boolean changed = canSendMore() || error.get() != null;
if (changed == false || semaphore.tryAcquire() == false) {
break;
}
}
Expand All @@ -770,9 +771,6 @@ protected void doRun() throws Exception {

FileChunk readNextChunk() throws Exception {
assert semaphore.availablePermits() == 0;
if (closed) {
throw new IllegalStateException("MultiFileSender was closed");
}
try {
if (currentInput == null) {
if (remainingFiles.hasNext() == false) {
Expand Down Expand Up @@ -816,10 +814,7 @@ boolean canSendMore() {
@Override
public void close() throws IOException {
assert semaphore.availablePermits() == 0;
if (closed == false) {
closed = true;
IOUtils.close(recycledBuffers::clear, currentInput, () -> currentInput = null);
}
IOUtils.close(recycledBuffers::clear, currentInput, () -> currentInput = null);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,6 @@
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.hasSize;
import static org.hamcrest.Matchers.lessThanOrEqualTo;
import static org.hamcrest.core.IsNull.notNullValue;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyBoolean;
Expand Down Expand Up @@ -673,8 +672,7 @@ public void writeFileChunk(StoreFileMetaData md, long position, BytesReference c
assertThat(error.get(), notNullValue());
assertThat(error.get().getMessage(), containsString("test chunk exception"));
});
assertThat("no more chunks should be sent", sentChunks.get(),
lessThanOrEqualTo(Math.min(totalChunks, maxConcurrentChunks * 2)));
assertThat("no more chunks should be sent", sentChunks.get(), equalTo(Math.min(totalChunks, maxConcurrentChunks)));
store.close();
}

Expand Down