|
12 | 12 | import org.elasticsearch.common.settings.Settings;
|
13 | 13 | import org.elasticsearch.test.ESTestCase;
|
14 | 14 |
|
| 15 | +import java.util.Collections; |
| 16 | +import java.util.Comparator; |
15 | 17 | import java.util.HashSet;
|
| 18 | +import java.util.LinkedList; |
16 | 19 | import java.util.List;
|
17 | 20 | import java.util.Set;
|
| 21 | +import java.util.TreeSet; |
18 | 22 | import java.util.concurrent.CountDownLatch;
|
19 | 23 | import java.util.concurrent.Semaphore;
|
20 | 24 | import java.util.concurrent.atomic.AtomicBoolean;
|
|
26 | 30 | import static org.hamcrest.Matchers.empty;
|
27 | 31 | import static org.hamcrest.Matchers.equalTo;
|
28 | 32 | import static org.hamcrest.Matchers.greaterThanOrEqualTo;
|
| 33 | +import static org.hamcrest.Matchers.hasSize; |
29 | 34 | import static org.hamcrest.Matchers.is;
|
30 | 35 | import static org.hamcrest.Matchers.lessThanOrEqualTo;
|
31 | 36 |
|
@@ -408,6 +413,58 @@ public void testThreadSafety() throws InterruptedException {
|
408 | 413 | checkThread.join();
|
409 | 414 | }
|
410 | 415 |
|
| 416 | + public void testCompletedRanges() { |
| 417 | + final byte[] fileContents = new byte[between(0, 1000)]; |
| 418 | + final SparseFileTracker sparseFileTracker = new SparseFileTracker("test", fileContents.length); |
| 419 | + |
| 420 | + final Set<AtomicBoolean> listenersCalled = new HashSet<>(); |
| 421 | + final Set<SparseFileTracker.Gap> gapsProcessed = Collections.synchronizedSet( |
| 422 | + new TreeSet<>(Comparator.comparingLong(SparseFileTracker.Gap::start)) |
| 423 | + ); |
| 424 | + for (int i = between(0, 10); i > 0; i--) { |
| 425 | + waitForRandomRange(fileContents, sparseFileTracker, listenersCalled::add, gap -> { |
| 426 | + if (processGap(fileContents, gap)) { |
| 427 | + gapsProcessed.add(gap); |
| 428 | + } |
| 429 | + }); |
| 430 | + assertTrue(listenersCalled.stream().allMatch(AtomicBoolean::get)); |
| 431 | + } |
| 432 | + |
| 433 | + // merge adjacent processed ranges as the SparseFileTracker does internally when a gap is completed |
| 434 | + // in order to check that SparseFileTracker.getCompletedRanges() returns the expected values |
| 435 | + final List<Tuple<Long, Long>> expectedCompletedRanges = gapsProcessed.stream() |
| 436 | + .map(gap -> Tuple.tuple(gap.start(), gap.end())) |
| 437 | + .collect(LinkedList::new, (gaps, gap) -> { |
| 438 | + if (gaps.isEmpty()) { |
| 439 | + gaps.add(gap); |
| 440 | + } else { |
| 441 | + final Tuple<Long, Long> previous = gaps.removeLast(); |
| 442 | + if (previous.v2().equals(gap.v1())) { |
| 443 | + gaps.add(Tuple.tuple(previous.v1(), gap.v2())); |
| 444 | + } else { |
| 445 | + gaps.add(previous); |
| 446 | + gaps.add(gap); |
| 447 | + } |
| 448 | + } |
| 449 | + }, (gaps1, gaps2) -> { |
| 450 | + if (gaps1.isEmpty() == false && gaps2.isEmpty() == false) { |
| 451 | + final Tuple<Long, Long> last = gaps1.removeLast(); |
| 452 | + final Tuple<Long, Long> first = gaps2.removeFirst(); |
| 453 | + if (last.v2().equals(first.v1())) { |
| 454 | + gaps1.add(Tuple.tuple(last.v1(), first.v2())); |
| 455 | + } else { |
| 456 | + gaps1.add(last); |
| 457 | + gaps2.add(first); |
| 458 | + } |
| 459 | + } |
| 460 | + gaps1.addAll(gaps2); |
| 461 | + }); |
| 462 | + |
| 463 | + final List<Tuple<Long, Long>> completedRanges = sparseFileTracker.getCompletedRanges(); |
| 464 | + assertThat(completedRanges, hasSize(expectedCompletedRanges.size())); |
| 465 | + assertThat(completedRanges, equalTo(expectedCompletedRanges)); |
| 466 | + } |
| 467 | + |
411 | 468 | private static void checkRandomAbsentRange(byte[] fileContents, SparseFileTracker sparseFileTracker, boolean expectExact) {
|
412 | 469 | final long checkStart = randomLongBetween(0, fileContents.length - 1);
|
413 | 470 | final long checkEnd = randomLongBetween(0, fileContents.length);
|
@@ -487,19 +544,21 @@ public void onFailure(Exception e) {
|
487 | 544 | }
|
488 | 545 | }
|
489 | 546 |
|
490 |
| - private static void processGap(byte[] fileContents, SparseFileTracker.Gap gap) { |
| 547 | + private static boolean processGap(byte[] fileContents, SparseFileTracker.Gap gap) { |
491 | 548 | for (long i = gap.start(); i < gap.end(); i++) {
|
492 | 549 | assertThat(fileContents[toIntBytes(i)], equalTo(UNAVAILABLE));
|
493 | 550 | }
|
494 | 551 |
|
495 | 552 | if (randomBoolean()) {
|
496 | 553 | gap.onFailure(new ElasticsearchException("simulated"));
|
| 554 | + return false; |
497 | 555 | } else {
|
498 | 556 | for (long i = gap.start(); i < gap.end(); i++) {
|
499 | 557 | fileContents[toIntBytes(i)] = AVAILABLE;
|
500 | 558 | gap.onProgress(i + 1L);
|
501 | 559 | }
|
502 | 560 | gap.onCompletion();
|
| 561 | + return true; |
503 | 562 | }
|
504 | 563 | }
|
505 | 564 | }
|
0 commit comments