@@ -697,6 +697,7 @@ private class MultiFileSender extends ActionRunnable<Void> implements Closeable
697
697
private final IntSupplier translogOps ;
698
698
private final LocalCheckpointTracker requestSeqIdTracker = new LocalCheckpointTracker (NO_OPS_PERFORMED , NO_OPS_PERFORMED );
699
699
private final Semaphore semaphore = new Semaphore (0 );
700
+ private MultiFileReader .FileChunk currentChunk ;
700
701
701
702
private MultiFileSender (Store store , IntSupplier translogOps , StoreFileMetaData [] files , ActionListener <Void > listener ) {
702
703
super (ActionListener .notifyOnce (listener ));
@@ -711,21 +712,16 @@ protected void doRun() throws Exception {
711
712
assert Transports .assertNotTransportThread (RecoverySourceHandler .this + "[send file chunk]" );
712
713
for (; ; ) {
713
714
assert semaphore .availablePermits () == 0 ;
714
- final MultiFileReader .FileChunk chunk ;
715
- try {
716
- chunk = multiFileReader .readNextChunk ();
717
- } catch (IOException e ) {
718
- handleErrorOnSendFiles (store , e , new StoreFileMetaData []{ multiFileReader .currentFile () });
719
- throw e ;
720
- }
721
- if (chunk == null ) {
715
+ final MultiFileReader .FileChunk chunk = readNextChunk ();
716
+ if (currentChunk == null ) {
722
717
semaphore .release (); // allow other threads respond if we are not done yet.
723
718
if (requestSeqIdTracker .getMaxSeqNo () == requestSeqIdTracker .getProcessedCheckpoint () && semaphore .tryAcquire ()) {
724
719
listener .onResponse (null );
725
720
}
726
721
break ;
727
722
}
728
723
final long requestSeqId = requestSeqIdTracker .generateSeqNo ();
724
+ currentChunk = null ;
729
725
cancellableThreads .execute (() ->
730
726
recoveryTarget .writeFileChunk (chunk .md , chunk .position , chunk .content , chunk .lastChunk , translogOps .getAsInt (),
731
727
ActionListener .wrap (
@@ -744,6 +740,7 @@ protected void doRun() throws Exception {
744
740
)
745
741
);
746
742
if (canSendMore () == false ) {
743
+ readNextChunk (); // read ahead while we're waiting for acknowledgements
747
744
semaphore .release ();
748
745
// Here we have to retry before abort to avoid a race situation where the other threads have flipped `canSendMore`
749
746
// condition but they are not going to resume the sending process because this thread still holds the semaphore.
@@ -754,6 +751,19 @@ protected void doRun() throws Exception {
754
751
}
755
752
}
756
753
754
+ private MultiFileReader .FileChunk readNextChunk () throws Exception {
755
+ assert semaphore .availablePermits () == 0 ;
756
+ if (currentChunk == null ) {
757
+ try {
758
+ currentChunk = multiFileReader .readNextChunk ();
759
+ } catch (IOException e ) {
760
+ handleErrorOnSendFiles (store , e , new StoreFileMetaData []{multiFileReader .currentFile ()});
761
+ throw e ;
762
+ }
763
+ }
764
+ return currentChunk ;
765
+ }
766
+
757
767
private boolean canSendMore () {
758
768
return requestSeqIdTracker .getMaxSeqNo () - requestSeqIdTracker .getProcessedCheckpoint () < maxConcurrentFileChunks ;
759
769
}
0 commit comments