|
64 | 64 | import java.util.concurrent.BrokenBarrierException;
|
65 | 65 | import java.util.concurrent.CountDownLatch;
|
66 | 66 | import java.util.concurrent.CyclicBarrier;
|
| 67 | +import java.util.concurrent.TimeUnit; |
67 | 68 | import java.util.stream.Collectors;
|
68 | 69 | import java.util.stream.IntStream;
|
69 | 70 | import java.util.stream.Stream;
|
@@ -700,32 +701,34 @@ private void assertTotalHits(String indexName, TotalHits originalAllHits, TotalH
|
700 | 701 | }
|
701 | 702 | }
|
702 | 703 |
|
703 |
| - private void assertRecoveryStats(String indexName, boolean preWarmEnabled) { |
| 704 | + private void assertRecoveryStats(String indexName, boolean preWarmEnabled) throws Exception { |
704 | 705 | int shardCount = getNumShards(indexName).totalNumShards;
|
705 |
| - final RecoveryResponse recoveryResponse = client().admin().indices().prepareRecoveries(indexName).get(); |
706 |
| - assertThat(recoveryResponse.toString(), recoveryResponse.shardRecoveryStates().get(indexName).size(), equalTo(shardCount)); |
707 |
| - |
708 |
| - for (List<RecoveryState> recoveryStates : recoveryResponse.shardRecoveryStates().values()) { |
709 |
| - for (RecoveryState recoveryState : recoveryStates) { |
710 |
| - ByteSizeValue cacheSize = getCacheSizeForNode(recoveryState.getTargetNode().getName()); |
711 |
| - boolean unboundedCache = cacheSize.equals(new ByteSizeValue(Long.MAX_VALUE, ByteSizeUnit.BYTES)); |
712 |
| - RecoveryState.Index index = recoveryState.getIndex(); |
713 |
| - assertThat( |
714 |
| - Strings.toString(recoveryState, true, true), |
715 |
| - index.recoveredFileCount(), |
716 |
| - preWarmEnabled && unboundedCache ? equalTo(index.totalRecoverFiles()) : greaterThanOrEqualTo(0) |
717 |
| - ); |
| 706 | + assertBusy(() -> { |
| 707 | + final RecoveryResponse recoveryResponse = client().admin().indices().prepareRecoveries(indexName).get(); |
| 708 | + assertThat(recoveryResponse.toString(), recoveryResponse.shardRecoveryStates().get(indexName).size(), equalTo(shardCount)); |
| 709 | + |
| 710 | + for (List<RecoveryState> recoveryStates : recoveryResponse.shardRecoveryStates().values()) { |
| 711 | + for (RecoveryState recoveryState : recoveryStates) { |
| 712 | + ByteSizeValue cacheSize = getCacheSizeForNode(recoveryState.getTargetNode().getName()); |
| 713 | + boolean unboundedCache = cacheSize.equals(new ByteSizeValue(Long.MAX_VALUE, ByteSizeUnit.BYTES)); |
| 714 | + RecoveryState.Index index = recoveryState.getIndex(); |
| 715 | + assertThat( |
| 716 | + Strings.toString(recoveryState, true, true), |
| 717 | + index.recoveredFileCount(), |
| 718 | + preWarmEnabled && unboundedCache ? equalTo(index.totalRecoverFiles()) : greaterThanOrEqualTo(0) |
| 719 | + ); |
718 | 720 |
|
719 |
| - // Since the cache size is variable, the pre-warm phase might fail as some of the files can be evicted |
720 |
| - // while a part is pre-fetched, in that case the recovery state stage is left as FINALIZE. |
721 |
| - assertThat( |
722 |
| - recoveryState.getStage(), |
723 |
| - unboundedCache |
724 |
| - ? equalTo(RecoveryState.Stage.DONE) |
725 |
| - : anyOf(equalTo(RecoveryState.Stage.DONE), equalTo(RecoveryState.Stage.FINALIZE)) |
726 |
| - ); |
| 721 | + // Since the cache size is variable, the pre-warm phase might fail as some of the files can be evicted |
| 722 | + // while a part is pre-fetched, in that case the recovery state stage is left as FINALIZE. |
| 723 | + assertThat( |
| 724 | + recoveryState.getStage(), |
| 725 | + unboundedCache |
| 726 | + ? equalTo(RecoveryState.Stage.DONE) |
| 727 | + : anyOf(equalTo(RecoveryState.Stage.DONE), equalTo(RecoveryState.Stage.FINALIZE)) |
| 728 | + ); |
| 729 | + } |
727 | 730 | }
|
728 |
| - } |
| 731 | + }, 30L, TimeUnit.SECONDS); |
729 | 732 | }
|
730 | 733 |
|
731 | 734 | private void assertSearchableSnapshotStats(String indexName, boolean cacheEnabled, List<String> nonCachedExtensions) {
|
|
0 commit comments