|
65 | 65 | import java.util.concurrent.BrokenBarrierException;
|
66 | 66 | import java.util.concurrent.CountDownLatch;
|
67 | 67 | import java.util.concurrent.CyclicBarrier;
|
| 68 | +import java.util.concurrent.TimeUnit; |
68 | 69 | import java.util.stream.Collectors;
|
69 | 70 | import java.util.stream.IntStream;
|
70 | 71 | import java.util.stream.Stream;
|
@@ -710,32 +711,34 @@ private void assertTotalHits(String indexName, TotalHits originalAllHits, TotalH
|
710 | 711 | }
|
711 | 712 | }
|
712 | 713 |
|
713 |
| - private void assertRecoveryStats(String indexName, boolean preWarmEnabled) { |
| 714 | + private void assertRecoveryStats(String indexName, boolean preWarmEnabled) throws Exception { |
714 | 715 | int shardCount = getNumShards(indexName).totalNumShards;
|
715 |
| - final RecoveryResponse recoveryResponse = client().admin().indices().prepareRecoveries(indexName).get(); |
716 |
| - assertThat(recoveryResponse.toString(), recoveryResponse.shardRecoveryStates().get(indexName).size(), equalTo(shardCount)); |
717 |
| - |
718 |
| - for (List<RecoveryState> recoveryStates : recoveryResponse.shardRecoveryStates().values()) { |
719 |
| - for (RecoveryState recoveryState : recoveryStates) { |
720 |
| - ByteSizeValue cacheSize = getCacheSizeForNode(recoveryState.getTargetNode().getName()); |
721 |
| - boolean unboundedCache = cacheSize.equals(new ByteSizeValue(Long.MAX_VALUE, ByteSizeUnit.BYTES)); |
722 |
| - RecoveryState.Index index = recoveryState.getIndex(); |
723 |
| - assertThat( |
724 |
| - Strings.toString(recoveryState, true, true), |
725 |
| - index.recoveredFileCount(), |
726 |
| - preWarmEnabled && unboundedCache ? equalTo(index.totalRecoverFiles()) : greaterThanOrEqualTo(0) |
727 |
| - ); |
| 716 | + assertBusy(() -> { |
| 717 | + final RecoveryResponse recoveryResponse = client().admin().indices().prepareRecoveries(indexName).get(); |
| 718 | + assertThat(recoveryResponse.toString(), recoveryResponse.shardRecoveryStates().get(indexName).size(), equalTo(shardCount)); |
| 719 | + |
| 720 | + for (List<RecoveryState> recoveryStates : recoveryResponse.shardRecoveryStates().values()) { |
| 721 | + for (RecoveryState recoveryState : recoveryStates) { |
| 722 | + ByteSizeValue cacheSize = getCacheSizeForNode(recoveryState.getTargetNode().getName()); |
| 723 | + boolean unboundedCache = cacheSize.equals(new ByteSizeValue(Long.MAX_VALUE, ByteSizeUnit.BYTES)); |
| 724 | + RecoveryState.Index index = recoveryState.getIndex(); |
| 725 | + assertThat( |
| 726 | + Strings.toString(recoveryState, true, true), |
| 727 | + index.recoveredFileCount(), |
| 728 | + preWarmEnabled && unboundedCache ? equalTo(index.totalRecoverFiles()) : greaterThanOrEqualTo(0) |
| 729 | + ); |
728 | 730 |
|
729 |
| - // Since the cache size is variable, the pre-warm phase might fail as some of the files can be evicted |
730 |
| - // while a part is pre-fetched, in that case the recovery state stage is left as FINALIZE. |
731 |
| - assertThat( |
732 |
| - recoveryState.getStage(), |
733 |
| - unboundedCache |
734 |
| - ? equalTo(RecoveryState.Stage.DONE) |
735 |
| - : anyOf(equalTo(RecoveryState.Stage.DONE), equalTo(RecoveryState.Stage.FINALIZE)) |
736 |
| - ); |
| 731 | + // Since the cache size is variable, the pre-warm phase might fail as some of the files can be evicted |
| 732 | + // while a part is pre-fetched, in that case the recovery state stage is left as FINALIZE. |
| 733 | + assertThat( |
| 734 | + recoveryState.getStage(), |
| 735 | + unboundedCache |
| 736 | + ? equalTo(RecoveryState.Stage.DONE) |
| 737 | + : anyOf(equalTo(RecoveryState.Stage.DONE), equalTo(RecoveryState.Stage.FINALIZE)) |
| 738 | + ); |
| 739 | + } |
737 | 740 | }
|
738 |
| - } |
| 741 | + }, 30L, TimeUnit.SECONDS); |
739 | 742 | }
|
740 | 743 |
|
741 | 744 | private void assertSearchableSnapshotStats(String indexName, boolean cacheEnabled, List<String> nonCachedExtensions) {
|
|
0 commit comments