From d2d7a3ddf5d02f6eba3bbf08a3519331f0926919 Mon Sep 17 00:00:00 2001 From: Yu Date: Mon, 26 Mar 2018 18:28:50 +0200 Subject: [PATCH 1/5] Change bulk's retry condition to be based on RestStatus Previously bulk's retry logic was based on Exception type of the failed response, here we change it to be based on RestStatus, in order to support rest hight level's request. --- .../client/BulkProcessorRetryIT.java | 221 ++++++++++++++++++ .../AbstractAsyncBulkByScrollAction.java | 8 +- .../index/reindex/RetryTests.java | 3 +- .../action/bulk/BulkRequestHandler.java | 4 +- .../org/elasticsearch/action/bulk/Retry.java | 21 +- .../action/bulk/BulkProcessorRetryIT.java | 12 +- .../elasticsearch/action/bulk/RetryTests.java | 9 +- 7 files changed, 250 insertions(+), 28 deletions(-) create mode 100644 client/rest-high-level/src/test/java/org/elasticsearch/client/BulkProcessorRetryIT.java diff --git a/client/rest-high-level/src/test/java/org/elasticsearch/client/BulkProcessorRetryIT.java b/client/rest-high-level/src/test/java/org/elasticsearch/client/BulkProcessorRetryIT.java new file mode 100644 index 0000000000000..ea66b643709a7 --- /dev/null +++ b/client/rest-high-level/src/test/java/org/elasticsearch/client/BulkProcessorRetryIT.java @@ -0,0 +1,221 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.elasticsearch.client; + +import org.elasticsearch.ExceptionsHelper; +import org.elasticsearch.action.admin.indices.refresh.RefreshRequest; +import org.elasticsearch.action.bulk.BackoffPolicy; +import org.elasticsearch.action.bulk.BulkItemResponse; +import org.elasticsearch.action.bulk.BulkProcessor; +import org.elasticsearch.action.bulk.BulkRequest; +import org.elasticsearch.action.bulk.BulkResponse; +import org.elasticsearch.action.get.MultiGetRequest; +import org.elasticsearch.action.index.IndexRequest; +import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.common.xcontent.XContentType; +import org.elasticsearch.rest.RestStatus; + +import java.util.Collections; +import java.util.Iterator; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.lessThan; + +public class BulkProcessorRetryIT extends ESRestHighLevelClientTestCase { + + private static final String INDEX_NAME = "index"; + private static final String TYPE_NAME = "type"; + + static { + System.setProperty("tests.rest.cluster", "localhost:9200"); + } + + private static BulkProcessor.Builder initBulkProcessorBuilder(BulkProcessor.Listener listener) { + return BulkProcessor.builder(highLevelClient()::bulkAsync, listener); + } + + public void testBulkRejectionLoadWithoutBackoff() throws Exception { + boolean rejectedExecutionExpected = true; + executeBulkRejectionLoad(BackoffPolicy.noBackoff(), rejectedExecutionExpected); + } + + public void testBulkRejectionLoadWithBackoff() throws Throwable { + boolean rejectedExecutionExpected = false; + executeBulkRejectionLoad(BackoffPolicy.exponentialBackoff(), rejectedExecutionExpected); + } + + private void executeBulkRejectionLoad(BackoffPolicy backoffPolicy, boolean rejectedExecutionExpected) throws Exception { + final CorrelatingBackoffPolicy internalPolicy = new CorrelatingBackoffPolicy(backoffPolicy); + final int numberOfAsyncOps = randomIntBetween(600, 700); + final CountDownLatch latch = new CountDownLatch(numberOfAsyncOps); + final Set responses = Collections.newSetFromMap(new ConcurrentHashMap<>()); + + BulkProcessor bulkProcessor = initBulkProcessorBuilder(new BulkProcessor.Listener() { + @Override + public void beforeBulk(long executionId, BulkRequest request) { + } + + @Override + public void afterBulk(long executionId, BulkRequest request, BulkResponse response) { + internalPolicy.logResponse(response); + responses.add(response); + latch.countDown(); + } + + @Override + public void afterBulk(long executionId, BulkRequest request, Throwable failure) { + responses.add(failure); + latch.countDown(); + } + }).setBulkActions(1) + .setConcurrentRequests(randomIntBetween(0, 100)) + .setBackoffPolicy(internalPolicy) + .build(); + + MultiGetRequest multiGetRequest = indexDocs(bulkProcessor, numberOfAsyncOps); + latch.await(10, TimeUnit.SECONDS); + bulkProcessor.close(); + + assertEquals(responses.size(), numberOfAsyncOps); + + boolean rejectedAfterAllRetries = false; + for (Object response : responses) { + if (response instanceof BulkResponse) { + BulkResponse bulkResponse = (BulkResponse) response; + for (BulkItemResponse bulkItemResponse : bulkResponse.getItems()) { + if (bulkItemResponse.isFailed()) { + BulkItemResponse.Failure failure = bulkItemResponse.getFailure(); + if (failure.getStatus() == RestStatus.TOO_MANY_REQUESTS) { + if (!rejectedExecutionExpected) { + Throwable rootCause = ExceptionsHelper.unwrapCause(failure.getCause()); + Iterator backoffState = internalPolicy.backoffStateFor(bulkResponse); + assertNotNull("backoffState is null (indicates a bulk request got rejected without retry)", backoffState); + if (backoffState.hasNext()) { + // we're not expecting that we overwhelmed it even once when we maxed out the number of retries + throw new AssertionError("Got rejected although backoff policy would allow more retries", rootCause); + } else { + rejectedAfterAllRetries = true; + logger.debug("We maxed out the number of bulk retries and got rejected (this is ok)."); + } + } + } else { + throw new AssertionError("Unexpected failure with status: " + failure.getStatus()); + } + } + } + } else { + Throwable t = (Throwable) response; + // we're not expecting any other errors + throw new AssertionError("Unexpected failure", t); + } + } + + highLevelClient().indices().refresh(new RefreshRequest()); + int searchResultCount = highLevelClient().multiGet(multiGetRequest).getResponses().length; + + if (rejectedAfterAllRetries) { + assertThat(searchResultCount, lessThan(numberOfAsyncOps)); + } else { + assertThat(searchResultCount, equalTo(numberOfAsyncOps)); + } + + } + + private static MultiGetRequest indexDocs(BulkProcessor processor, int numDocs) { + MultiGetRequest multiGetRequest = new MultiGetRequest(); + for (int i = 1; i <= numDocs; i++) { + processor.add(new IndexRequest(INDEX_NAME, TYPE_NAME, Integer.toString(i)) + .source(XContentType.JSON, "field", randomRealisticUnicodeOfCodepointLengthBetween(1, 30))); + multiGetRequest.add(INDEX_NAME, TYPE_NAME, Integer.toString(i)); + } + return multiGetRequest; + } + + /** + * Internal helper class to correlate backoff states with bulk responses. This is needed to check whether we maxed out the number + * of retries but still got rejected (which is perfectly fine and can also happen from time to time under heavy load). + * + * This implementation relies on an implementation detail in Retry, namely that the bulk listener is notified on the same thread + * as the last call to the backoff policy's iterator. The advantage is that this is non-invasive to the rest of the production code. + */ + private static class CorrelatingBackoffPolicy extends BackoffPolicy { + private final Map> correlations = new ConcurrentHashMap<>(); + // this is intentionally *not* static final. We will only ever have one instance of this class per test case and want the + // thread local to be eligible for garbage collection right after the test to avoid leaks. + private final ThreadLocal> iterators = new ThreadLocal<>(); + + private final BackoffPolicy delegate; + + private CorrelatingBackoffPolicy(BackoffPolicy delegate) { + this.delegate = delegate; + } + + public Iterator backoffStateFor(BulkResponse response) { + return correlations.get(response); + } + + // Assumption: This method is called from the same thread as the last call to the internal iterator's #hasNext() / #next() + // see also Retry.AbstractRetryHandler#onResponse(). + public void logResponse(BulkResponse response) { + Iterator iterator = iterators.get(); + // did we ever retry? + if (iterator != null) { + // we should correlate any iterator only once + iterators.remove(); + correlations.put(response, iterator); + } + } + + @Override + public Iterator iterator() { + return new CorrelatingIterator(iterators, delegate.iterator()); + } + + private static class CorrelatingIterator implements Iterator { + private final Iterator delegate; + private final ThreadLocal> iterators; + + private CorrelatingIterator(ThreadLocal> iterators, Iterator delegate) { + this.iterators = iterators; + this.delegate = delegate; + } + + @Override + public boolean hasNext() { + // update on every invocation as we might get rescheduled on a different thread. Unfortunately, there is a chance that + // we pollute the thread local map with stale values. Due to the implementation of Retry and the life cycle of the + // enclosing class CorrelatingBackoffPolicy this should not pose a major problem though. + iterators.set(this); + return delegate.hasNext(); + } + + @Override + public TimeValue next() { + // update on every invocation + iterators.set(this); + return delegate.next(); + } + } + } +} diff --git a/modules/reindex/src/main/java/org/elasticsearch/index/reindex/AbstractAsyncBulkByScrollAction.java b/modules/reindex/src/main/java/org/elasticsearch/index/reindex/AbstractAsyncBulkByScrollAction.java index c2eb99afa18a8..194fd7ec38bad 100644 --- a/modules/reindex/src/main/java/org/elasticsearch/index/reindex/AbstractAsyncBulkByScrollAction.java +++ b/modules/reindex/src/main/java/org/elasticsearch/index/reindex/AbstractAsyncBulkByScrollAction.java @@ -32,7 +32,6 @@ import org.elasticsearch.action.bulk.BulkRequest; import org.elasticsearch.action.bulk.BulkResponse; import org.elasticsearch.action.bulk.Retry; -import org.elasticsearch.index.reindex.ScrollableHitSource.SearchFailure; import org.elasticsearch.action.delete.DeleteRequest; import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.client.ParentTaskAssigningClient; @@ -41,7 +40,6 @@ import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.util.concurrent.AbstractRunnable; -import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException; import org.elasticsearch.index.VersionType; import org.elasticsearch.index.mapper.IdFieldMapper; import org.elasticsearch.index.mapper.IndexFieldMapper; @@ -50,6 +48,8 @@ import org.elasticsearch.index.mapper.SourceFieldMapper; import org.elasticsearch.index.mapper.TypeFieldMapper; import org.elasticsearch.index.mapper.VersionFieldMapper; +import org.elasticsearch.index.reindex.ScrollableHitSource.SearchFailure; +import org.elasticsearch.rest.RestStatus; import org.elasticsearch.script.ExecutableScript; import org.elasticsearch.script.Script; import org.elasticsearch.script.ScriptService; @@ -76,8 +76,8 @@ import static java.util.Collections.emptyList; import static java.util.Collections.unmodifiableList; import static org.elasticsearch.action.bulk.BackoffPolicy.exponentialBackoff; -import static org.elasticsearch.index.reindex.AbstractBulkByScrollRequest.SIZE_ALL_MATCHES; import static org.elasticsearch.common.unit.TimeValue.timeValueNanos; +import static org.elasticsearch.index.reindex.AbstractBulkByScrollRequest.SIZE_ALL_MATCHES; import static org.elasticsearch.rest.RestStatus.CONFLICT; import static org.elasticsearch.search.sort.SortBuilders.fieldSort; @@ -140,7 +140,7 @@ public AbstractAsyncBulkByScrollAction(BulkByScrollTask task, Logger logger, Par this.mainRequest = mainRequest; this.listener = listener; BackoffPolicy backoffPolicy = buildBackoffPolicy(); - bulkRetry = new Retry(EsRejectedExecutionException.class, BackoffPolicy.wrap(backoffPolicy, worker::countBulkRetry), threadPool); + bulkRetry = new Retry(RestStatus.TOO_MANY_REQUESTS, BackoffPolicy.wrap(backoffPolicy, worker::countBulkRetry), threadPool); scrollSource = buildScrollableResultSource(backoffPolicy); scriptApplier = Objects.requireNonNull(buildScriptApplier(), "script applier must not be null"); /* diff --git a/modules/reindex/src/test/java/org/elasticsearch/index/reindex/RetryTests.java b/modules/reindex/src/test/java/org/elasticsearch/index/reindex/RetryTests.java index da0dbf2aae345..f860bc86fdf69 100644 --- a/modules/reindex/src/test/java/org/elasticsearch/index/reindex/RetryTests.java +++ b/modules/reindex/src/test/java/org/elasticsearch/index/reindex/RetryTests.java @@ -34,6 +34,7 @@ import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException; import org.elasticsearch.index.query.QueryBuilders; import org.elasticsearch.plugins.Plugin; +import org.elasticsearch.rest.RestStatus; import org.elasticsearch.test.ESIntegTestCase; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.Netty4Plugin; @@ -183,7 +184,7 @@ private void testCase( bulk.add(client().prepareIndex("source", "test").setSource("foo", "bar " + i)); } - Retry retry = new Retry(EsRejectedExecutionException.class, BackoffPolicy.exponentialBackoff(), client().threadPool()); + Retry retry = new Retry(RestStatus.TOO_MANY_REQUESTS, BackoffPolicy.exponentialBackoff(), client().threadPool()); BulkResponse initialBulkResponse = retry.withBackoff(client()::bulk, bulk.request(), client().settings()).actionGet(); assertFalse(initialBulkResponse.buildFailureMessage(), initialBulkResponse.hasFailures()); client().admin().indices().prepareRefresh("source").get(); diff --git a/server/src/main/java/org/elasticsearch/action/bulk/BulkRequestHandler.java b/server/src/main/java/org/elasticsearch/action/bulk/BulkRequestHandler.java index adb1d32161fe1..c08896d196bd4 100644 --- a/server/src/main/java/org/elasticsearch/action/bulk/BulkRequestHandler.java +++ b/server/src/main/java/org/elasticsearch/action/bulk/BulkRequestHandler.java @@ -23,7 +23,7 @@ import org.elasticsearch.action.ActionListener; import org.elasticsearch.common.logging.Loggers; import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException; +import org.elasticsearch.rest.RestStatus; import org.elasticsearch.threadpool.Scheduler; import java.util.concurrent.CountDownLatch; @@ -49,7 +49,7 @@ public final class BulkRequestHandler { this.consumer = consumer; this.listener = listener; this.concurrentRequests = concurrentRequests; - this.retry = new Retry(EsRejectedExecutionException.class, backoffPolicy, scheduler); + this.retry = new Retry(RestStatus.TOO_MANY_REQUESTS, backoffPolicy, scheduler); this.semaphore = new Semaphore(concurrentRequests > 0 ? concurrentRequests : 1); } diff --git a/server/src/main/java/org/elasticsearch/action/bulk/Retry.java b/server/src/main/java/org/elasticsearch/action/bulk/Retry.java index 9985d23b9badb..13d4ca3ec5ef2 100644 --- a/server/src/main/java/org/elasticsearch/action/bulk/Retry.java +++ b/server/src/main/java/org/elasticsearch/action/bulk/Retry.java @@ -19,13 +19,13 @@ package org.elasticsearch.action.bulk; import org.apache.logging.log4j.Logger; -import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.support.PlainActionFuture; import org.elasticsearch.common.logging.Loggers; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.util.concurrent.FutureUtils; +import org.elasticsearch.rest.RestStatus; import org.elasticsearch.threadpool.Scheduler; import org.elasticsearch.threadpool.ThreadPool; @@ -40,12 +40,12 @@ * Encapsulates synchronous and asynchronous retry logic. */ public class Retry { - private final Class retryOnThrowable; + private final RestStatus retryOnStatus; private final BackoffPolicy backoffPolicy; private final Scheduler scheduler; - public Retry(Class retryOnThrowable, BackoffPolicy backoffPolicy, Scheduler scheduler) { - this.retryOnThrowable = retryOnThrowable; + public Retry(RestStatus retryOnStatus, BackoffPolicy backoffPolicy, Scheduler scheduler) { + this.retryOnStatus = retryOnStatus; this.backoffPolicy = backoffPolicy; this.scheduler = scheduler; } @@ -60,7 +60,7 @@ public Retry(Class retryOnThrowable, BackoffPolicy backoffP */ public void withBackoff(BiConsumer> consumer, BulkRequest bulkRequest, ActionListener listener, Settings settings) { - RetryHandler r = new RetryHandler(retryOnThrowable, backoffPolicy, consumer, listener, settings, scheduler); + RetryHandler r = new RetryHandler(retryOnStatus, backoffPolicy, consumer, listener, settings, scheduler); r.execute(bulkRequest); } @@ -86,7 +86,7 @@ static class RetryHandler implements ActionListener { private final BiConsumer> consumer; private final ActionListener listener; private final Iterator backoff; - private final Class retryOnThrowable; + private final RestStatus retryOnStatus; // Access only when holding a client-side lock, see also #addResponses() private final List responses = new ArrayList<>(); private final long startTimestampNanos; @@ -95,10 +95,10 @@ static class RetryHandler implements ActionListener { private volatile BulkRequest currentBulkRequest; private volatile ScheduledFuture scheduledRequestFuture; - RetryHandler(Class retryOnThrowable, BackoffPolicy backoffPolicy, + RetryHandler(RestStatus retryOnStatus, BackoffPolicy backoffPolicy, BiConsumer> consumer, ActionListener listener, Settings settings, Scheduler scheduler) { - this.retryOnThrowable = retryOnThrowable; + this.retryOnStatus = retryOnStatus; this.backoff = backoffPolicy.iterator(); this.consumer = consumer; this.listener = listener; @@ -160,9 +160,8 @@ private boolean canRetry(BulkResponse bulkItemResponses) { } for (BulkItemResponse bulkItemResponse : bulkItemResponses) { if (bulkItemResponse.isFailed()) { - final Throwable cause = bulkItemResponse.getFailure().getCause(); - final Throwable rootCause = ExceptionsHelper.unwrapCause(cause); - if (!rootCause.getClass().equals(retryOnThrowable)) { + final RestStatus status = bulkItemResponse.getFailure().getStatus(); + if (this.retryOnStatus != status) { return false; } } diff --git a/server/src/test/java/org/elasticsearch/action/bulk/BulkProcessorRetryIT.java b/server/src/test/java/org/elasticsearch/action/bulk/BulkProcessorRetryIT.java index 1a07eac1adbd5..4ce6010b92f4d 100644 --- a/server/src/test/java/org/elasticsearch/action/bulk/BulkProcessorRetryIT.java +++ b/server/src/test/java/org/elasticsearch/action/bulk/BulkProcessorRetryIT.java @@ -23,8 +23,8 @@ import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; -import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException; import org.elasticsearch.index.query.QueryBuilders; +import org.elasticsearch.rest.RestStatus; import org.elasticsearch.test.ESIntegTestCase; import org.hamcrest.Matcher; @@ -114,9 +114,9 @@ public void afterBulk(long executionId, BulkRequest request, Throwable failure) for (BulkItemResponse bulkItemResponse : bulkResponse.getItems()) { if (bulkItemResponse.isFailed()) { BulkItemResponse.Failure failure = bulkItemResponse.getFailure(); - Throwable rootCause = ExceptionsHelper.unwrapCause(failure.getCause()); - if (rootCause instanceof EsRejectedExecutionException) { - if (rejectedExecutionExpected == false) { + if (failure.getStatus() == RestStatus.TOO_MANY_REQUESTS) { + if (!rejectedExecutionExpected) { + Throwable rootCause = ExceptionsHelper.unwrapCause(failure.getCause()); Iterator backoffState = internalPolicy.backoffStateFor(bulkResponse); assertNotNull("backoffState is null (indicates a bulk request got rejected without retry)", backoffState); if (backoffState.hasNext()) { @@ -127,7 +127,7 @@ public void afterBulk(long executionId, BulkRequest request, Throwable failure) } } } else { - throw new AssertionError("Unexpected failure", rootCause); + throw new AssertionError("Unexpected failure status: " + failure.getStatus()); } } } @@ -142,7 +142,7 @@ public void afterBulk(long executionId, BulkRequest request, Throwable failure) // validate we did not create any duplicates due to retries Matcher searchResultCount; - // it is ok if we lost some index operations to rejected executions (which is possible even when backing off (although less likely) + // it is ok if we lost some index operations to rejected executions (which is possible even when backing off although less likely) searchResultCount = lessThanOrEqualTo((long) numberOfAsyncOps); SearchResponse results = client() diff --git a/server/src/test/java/org/elasticsearch/action/bulk/RetryTests.java b/server/src/test/java/org/elasticsearch/action/bulk/RetryTests.java index 136097a292668..1fa7c10fb847a 100644 --- a/server/src/test/java/org/elasticsearch/action/bulk/RetryTests.java +++ b/server/src/test/java/org/elasticsearch/action/bulk/RetryTests.java @@ -26,6 +26,7 @@ import org.elasticsearch.action.update.UpdateRequest; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException; +import org.elasticsearch.rest.RestStatus; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.client.NoOpClient; import org.junit.After; @@ -84,7 +85,7 @@ public void testRetryBacksOff() throws Exception { BackoffPolicy backoff = BackoffPolicy.constantBackoff(DELAY, CALLS_TO_FAIL); BulkRequest bulkRequest = createBulkRequest(); - BulkResponse response = new Retry(EsRejectedExecutionException.class, backoff, bulkClient.threadPool()) + BulkResponse response = new Retry(RestStatus.TOO_MANY_REQUESTS, backoff, bulkClient.threadPool()) .withBackoff(bulkClient::bulk, bulkRequest, bulkClient.settings()) .actionGet(); @@ -96,7 +97,7 @@ public void testRetryFailsAfterBackoff() throws Exception { BackoffPolicy backoff = BackoffPolicy.constantBackoff(DELAY, CALLS_TO_FAIL - 1); BulkRequest bulkRequest = createBulkRequest(); - BulkResponse response = new Retry(EsRejectedExecutionException.class, backoff, bulkClient.threadPool()) + BulkResponse response = new Retry(RestStatus.TOO_MANY_REQUESTS, backoff, bulkClient.threadPool()) .withBackoff(bulkClient::bulk, bulkRequest, bulkClient.settings()) .actionGet(); @@ -109,7 +110,7 @@ public void testRetryWithListenerBacksOff() throws Exception { AssertingListener listener = new AssertingListener(); BulkRequest bulkRequest = createBulkRequest(); - Retry retry = new Retry(EsRejectedExecutionException.class, backoff, bulkClient.threadPool()); + Retry retry = new Retry(RestStatus.TOO_MANY_REQUESTS, backoff, bulkClient.threadPool()); retry.withBackoff(bulkClient::bulk, bulkRequest, listener, bulkClient.settings()); listener.awaitCallbacksCalled(); @@ -124,7 +125,7 @@ public void testRetryWithListenerFailsAfterBacksOff() throws Exception { AssertingListener listener = new AssertingListener(); BulkRequest bulkRequest = createBulkRequest(); - Retry retry = new Retry(EsRejectedExecutionException.class, backoff, bulkClient.threadPool()); + Retry retry = new Retry(RestStatus.TOO_MANY_REQUESTS, backoff, bulkClient.threadPool()); retry.withBackoff(bulkClient::bulk, bulkRequest, listener, bulkClient.settings()); listener.awaitCallbacksCalled(); From 5be35551b3c50860d16a3426f2420af6ab7f9373 Mon Sep 17 00:00:00 2001 From: Yu Date: Wed, 4 Apr 2018 22:05:37 +0200 Subject: [PATCH 2/5] changes according to comments --- .../client/BulkProcessorRetryIT.java | 17 ++++++----------- .../AbstractAsyncBulkByScrollAction.java | 3 +-- .../index/reindex/RetryTests.java | 3 +-- .../action/bulk/BulkRequestHandler.java | 3 +-- .../org/elasticsearch/action/bulk/Retry.java | 19 ++++++++----------- .../action/bulk/BulkProcessorRetryIT.java | 17 +++++++++-------- .../elasticsearch/action/bulk/RetryTests.java | 9 ++++----- 7 files changed, 30 insertions(+), 41 deletions(-) diff --git a/client/rest-high-level/src/test/java/org/elasticsearch/client/BulkProcessorRetryIT.java b/client/rest-high-level/src/test/java/org/elasticsearch/client/BulkProcessorRetryIT.java index ea66b643709a7..417116eaf6950 100644 --- a/client/rest-high-level/src/test/java/org/elasticsearch/client/BulkProcessorRetryIT.java +++ b/client/rest-high-level/src/test/java/org/elasticsearch/client/BulkProcessorRetryIT.java @@ -18,7 +18,6 @@ */ package org.elasticsearch.client; -import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.action.admin.indices.refresh.RefreshRequest; import org.elasticsearch.action.bulk.BackoffPolicy; import org.elasticsearch.action.bulk.BulkItemResponse; @@ -47,10 +46,6 @@ public class BulkProcessorRetryIT extends ESRestHighLevelClientTestCase { private static final String INDEX_NAME = "index"; private static final String TYPE_NAME = "type"; - static { - System.setProperty("tests.rest.cluster", "localhost:9200"); - } - private static BulkProcessor.Builder initBulkProcessorBuilder(BulkProcessor.Listener listener) { return BulkProcessor.builder(highLevelClient()::bulkAsync, listener); } @@ -107,13 +102,13 @@ public void afterBulk(long executionId, BulkRequest request, Throwable failure) if (bulkItemResponse.isFailed()) { BulkItemResponse.Failure failure = bulkItemResponse.getFailure(); if (failure.getStatus() == RestStatus.TOO_MANY_REQUESTS) { - if (!rejectedExecutionExpected) { - Throwable rootCause = ExceptionsHelper.unwrapCause(failure.getCause()); + if (rejectedExecutionExpected == false) { Iterator backoffState = internalPolicy.backoffStateFor(bulkResponse); assertNotNull("backoffState is null (indicates a bulk request got rejected without retry)", backoffState); if (backoffState.hasNext()) { // we're not expecting that we overwhelmed it even once when we maxed out the number of retries - throw new AssertionError("Got rejected although backoff policy would allow more retries", rootCause); + throw new AssertionError("Got rejected although backoff policy would allow more retries", + failure.getCause()); } else { rejectedAfterAllRetries = true; logger.debug("We maxed out the number of bulk retries and got rejected (this is ok)."); @@ -132,12 +127,12 @@ public void afterBulk(long executionId, BulkRequest request, Throwable failure) } highLevelClient().indices().refresh(new RefreshRequest()); - int searchResultCount = highLevelClient().multiGet(multiGetRequest).getResponses().length; + int multiGetResponsesCount = highLevelClient().multiGet(multiGetRequest).getResponses().length; if (rejectedAfterAllRetries) { - assertThat(searchResultCount, lessThan(numberOfAsyncOps)); + assertThat(multiGetResponsesCount, lessThan(numberOfAsyncOps)); } else { - assertThat(searchResultCount, equalTo(numberOfAsyncOps)); + assertThat(multiGetResponsesCount, equalTo(numberOfAsyncOps)); } } diff --git a/modules/reindex/src/main/java/org/elasticsearch/index/reindex/AbstractAsyncBulkByScrollAction.java b/modules/reindex/src/main/java/org/elasticsearch/index/reindex/AbstractAsyncBulkByScrollAction.java index 194fd7ec38bad..46ea9238434a7 100644 --- a/modules/reindex/src/main/java/org/elasticsearch/index/reindex/AbstractAsyncBulkByScrollAction.java +++ b/modules/reindex/src/main/java/org/elasticsearch/index/reindex/AbstractAsyncBulkByScrollAction.java @@ -49,7 +49,6 @@ import org.elasticsearch.index.mapper.TypeFieldMapper; import org.elasticsearch.index.mapper.VersionFieldMapper; import org.elasticsearch.index.reindex.ScrollableHitSource.SearchFailure; -import org.elasticsearch.rest.RestStatus; import org.elasticsearch.script.ExecutableScript; import org.elasticsearch.script.Script; import org.elasticsearch.script.ScriptService; @@ -140,7 +139,7 @@ public AbstractAsyncBulkByScrollAction(BulkByScrollTask task, Logger logger, Par this.mainRequest = mainRequest; this.listener = listener; BackoffPolicy backoffPolicy = buildBackoffPolicy(); - bulkRetry = new Retry(RestStatus.TOO_MANY_REQUESTS, BackoffPolicy.wrap(backoffPolicy, worker::countBulkRetry), threadPool); + bulkRetry = new Retry(BackoffPolicy.wrap(backoffPolicy, worker::countBulkRetry), threadPool); scrollSource = buildScrollableResultSource(backoffPolicy); scriptApplier = Objects.requireNonNull(buildScriptApplier(), "script applier must not be null"); /* diff --git a/modules/reindex/src/test/java/org/elasticsearch/index/reindex/RetryTests.java b/modules/reindex/src/test/java/org/elasticsearch/index/reindex/RetryTests.java index f860bc86fdf69..07786fa568749 100644 --- a/modules/reindex/src/test/java/org/elasticsearch/index/reindex/RetryTests.java +++ b/modules/reindex/src/test/java/org/elasticsearch/index/reindex/RetryTests.java @@ -34,7 +34,6 @@ import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException; import org.elasticsearch.index.query.QueryBuilders; import org.elasticsearch.plugins.Plugin; -import org.elasticsearch.rest.RestStatus; import org.elasticsearch.test.ESIntegTestCase; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.Netty4Plugin; @@ -184,7 +183,7 @@ private void testCase( bulk.add(client().prepareIndex("source", "test").setSource("foo", "bar " + i)); } - Retry retry = new Retry(RestStatus.TOO_MANY_REQUESTS, BackoffPolicy.exponentialBackoff(), client().threadPool()); + Retry retry = new Retry(BackoffPolicy.exponentialBackoff(), client().threadPool()); BulkResponse initialBulkResponse = retry.withBackoff(client()::bulk, bulk.request(), client().settings()).actionGet(); assertFalse(initialBulkResponse.buildFailureMessage(), initialBulkResponse.hasFailures()); client().admin().indices().prepareRefresh("source").get(); diff --git a/server/src/main/java/org/elasticsearch/action/bulk/BulkRequestHandler.java b/server/src/main/java/org/elasticsearch/action/bulk/BulkRequestHandler.java index c08896d196bd4..d02173ca370bd 100644 --- a/server/src/main/java/org/elasticsearch/action/bulk/BulkRequestHandler.java +++ b/server/src/main/java/org/elasticsearch/action/bulk/BulkRequestHandler.java @@ -23,7 +23,6 @@ import org.elasticsearch.action.ActionListener; import org.elasticsearch.common.logging.Loggers; import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.rest.RestStatus; import org.elasticsearch.threadpool.Scheduler; import java.util.concurrent.CountDownLatch; @@ -49,7 +48,7 @@ public final class BulkRequestHandler { this.consumer = consumer; this.listener = listener; this.concurrentRequests = concurrentRequests; - this.retry = new Retry(RestStatus.TOO_MANY_REQUESTS, backoffPolicy, scheduler); + this.retry = new Retry(backoffPolicy, scheduler); this.semaphore = new Semaphore(concurrentRequests > 0 ? concurrentRequests : 1); } diff --git a/server/src/main/java/org/elasticsearch/action/bulk/Retry.java b/server/src/main/java/org/elasticsearch/action/bulk/Retry.java index 13d4ca3ec5ef2..bee712cf33833 100644 --- a/server/src/main/java/org/elasticsearch/action/bulk/Retry.java +++ b/server/src/main/java/org/elasticsearch/action/bulk/Retry.java @@ -40,12 +40,10 @@ * Encapsulates synchronous and asynchronous retry logic. */ public class Retry { - private final RestStatus retryOnStatus; private final BackoffPolicy backoffPolicy; private final Scheduler scheduler; - public Retry(RestStatus retryOnStatus, BackoffPolicy backoffPolicy, Scheduler scheduler) { - this.retryOnStatus = retryOnStatus; + public Retry(BackoffPolicy backoffPolicy, Scheduler scheduler) { this.backoffPolicy = backoffPolicy; this.scheduler = scheduler; } @@ -60,7 +58,7 @@ public Retry(RestStatus retryOnStatus, BackoffPolicy backoffPolicy, Scheduler sc */ public void withBackoff(BiConsumer> consumer, BulkRequest bulkRequest, ActionListener listener, Settings settings) { - RetryHandler r = new RetryHandler(retryOnStatus, backoffPolicy, consumer, listener, settings, scheduler); + RetryHandler r = new RetryHandler(backoffPolicy, consumer, listener, settings, scheduler); r.execute(bulkRequest); } @@ -81,12 +79,13 @@ public PlainActionFuture withBackoff(BiConsumer { + private static final RestStatus RETRY_STATUS = RestStatus.TOO_MANY_REQUESTS; + private final Logger logger; private final Scheduler scheduler; private final BiConsumer> consumer; private final ActionListener listener; private final Iterator backoff; - private final RestStatus retryOnStatus; // Access only when holding a client-side lock, see also #addResponses() private final List responses = new ArrayList<>(); private final long startTimestampNanos; @@ -95,10 +94,8 @@ static class RetryHandler implements ActionListener { private volatile BulkRequest currentBulkRequest; private volatile ScheduledFuture scheduledRequestFuture; - RetryHandler(RestStatus retryOnStatus, BackoffPolicy backoffPolicy, - BiConsumer> consumer, ActionListener listener, - Settings settings, Scheduler scheduler) { - this.retryOnStatus = retryOnStatus; + RetryHandler(BackoffPolicy backoffPolicy, BiConsumer> consumer, + ActionListener listener, Settings settings, Scheduler scheduler) { this.backoff = backoffPolicy.iterator(); this.consumer = consumer; this.listener = listener; @@ -160,8 +157,8 @@ private boolean canRetry(BulkResponse bulkItemResponses) { } for (BulkItemResponse bulkItemResponse : bulkItemResponses) { if (bulkItemResponse.isFailed()) { - final RestStatus status = bulkItemResponse.getFailure().getStatus(); - if (this.retryOnStatus != status) { + final RestStatus status = bulkItemResponse.status(); + if (status == RETRY_STATUS) { return false; } } diff --git a/server/src/test/java/org/elasticsearch/action/bulk/BulkProcessorRetryIT.java b/server/src/test/java/org/elasticsearch/action/bulk/BulkProcessorRetryIT.java index 4ce6010b92f4d..5d4dc6d3e0fdb 100644 --- a/server/src/test/java/org/elasticsearch/action/bulk/BulkProcessorRetryIT.java +++ b/server/src/test/java/org/elasticsearch/action/bulk/BulkProcessorRetryIT.java @@ -26,7 +26,6 @@ import org.elasticsearch.index.query.QueryBuilders; import org.elasticsearch.rest.RestStatus; import org.elasticsearch.test.ESIntegTestCase; -import org.hamcrest.Matcher; import java.util.Collections; import java.util.Iterator; @@ -38,7 +37,7 @@ import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; import static org.hamcrest.Matchers.equalTo; -import static org.hamcrest.Matchers.lessThanOrEqualTo; +import static org.hamcrest.Matchers.lessThan; @ESIntegTestCase.ClusterScope(scope = ESIntegTestCase.Scope.SUITE, numDataNodes = 2) public class BulkProcessorRetryIT extends ESIntegTestCase { @@ -108,6 +107,7 @@ public void afterBulk(long executionId, BulkRequest request, Throwable failure) assertThat(responses.size(), equalTo(numberOfAsyncOps)); // validate all responses + boolean rejectedAfterAllRetries = false; for (Object response : responses) { if (response instanceof BulkResponse) { BulkResponse bulkResponse = (BulkResponse) response; @@ -123,6 +123,7 @@ public void afterBulk(long executionId, BulkRequest request, Throwable failure) // we're not expecting that we overwhelmed it even once when we maxed out the number of retries throw new AssertionError("Got rejected although backoff policy would allow more retries", rootCause); } else { + rejectedAfterAllRetries = true; logger.debug("We maxed out the number of bulk retries and got rejected (this is ok)."); } } @@ -140,18 +141,18 @@ public void afterBulk(long executionId, BulkRequest request, Throwable failure) client().admin().indices().refresh(new RefreshRequest()).get(); - // validate we did not create any duplicates due to retries - Matcher searchResultCount; - // it is ok if we lost some index operations to rejected executions (which is possible even when backing off although less likely) - searchResultCount = lessThanOrEqualTo((long) numberOfAsyncOps); - SearchResponse results = client() .prepareSearch(INDEX_NAME) .setTypes(TYPE_NAME) .setQuery(QueryBuilders.matchAllQuery()) .setSize(0) .get(); - assertThat(results.getHits().getTotalHits(), searchResultCount); + + if (rejectedAfterAllRetries) { + assertThat((int) results.getHits().getTotalHits(), lessThan(numberOfAsyncOps)); + } else { + assertThat(results.getHits().getTotalHits(), equalTo(numberOfAsyncOps)); + } } private static void indexDocs(BulkProcessor processor, int numDocs) { diff --git a/server/src/test/java/org/elasticsearch/action/bulk/RetryTests.java b/server/src/test/java/org/elasticsearch/action/bulk/RetryTests.java index 1fa7c10fb847a..320f11ff6d04f 100644 --- a/server/src/test/java/org/elasticsearch/action/bulk/RetryTests.java +++ b/server/src/test/java/org/elasticsearch/action/bulk/RetryTests.java @@ -26,7 +26,6 @@ import org.elasticsearch.action.update.UpdateRequest; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException; -import org.elasticsearch.rest.RestStatus; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.client.NoOpClient; import org.junit.After; @@ -85,7 +84,7 @@ public void testRetryBacksOff() throws Exception { BackoffPolicy backoff = BackoffPolicy.constantBackoff(DELAY, CALLS_TO_FAIL); BulkRequest bulkRequest = createBulkRequest(); - BulkResponse response = new Retry(RestStatus.TOO_MANY_REQUESTS, backoff, bulkClient.threadPool()) + BulkResponse response = new Retry(backoff, bulkClient.threadPool()) .withBackoff(bulkClient::bulk, bulkRequest, bulkClient.settings()) .actionGet(); @@ -97,7 +96,7 @@ public void testRetryFailsAfterBackoff() throws Exception { BackoffPolicy backoff = BackoffPolicy.constantBackoff(DELAY, CALLS_TO_FAIL - 1); BulkRequest bulkRequest = createBulkRequest(); - BulkResponse response = new Retry(RestStatus.TOO_MANY_REQUESTS, backoff, bulkClient.threadPool()) + BulkResponse response = new Retry(backoff, bulkClient.threadPool()) .withBackoff(bulkClient::bulk, bulkRequest, bulkClient.settings()) .actionGet(); @@ -110,7 +109,7 @@ public void testRetryWithListenerBacksOff() throws Exception { AssertingListener listener = new AssertingListener(); BulkRequest bulkRequest = createBulkRequest(); - Retry retry = new Retry(RestStatus.TOO_MANY_REQUESTS, backoff, bulkClient.threadPool()); + Retry retry = new Retry(backoff, bulkClient.threadPool()); retry.withBackoff(bulkClient::bulk, bulkRequest, listener, bulkClient.settings()); listener.awaitCallbacksCalled(); @@ -125,7 +124,7 @@ public void testRetryWithListenerFailsAfterBacksOff() throws Exception { AssertingListener listener = new AssertingListener(); BulkRequest bulkRequest = createBulkRequest(); - Retry retry = new Retry(RestStatus.TOO_MANY_REQUESTS, backoff, bulkClient.threadPool()); + Retry retry = new Retry(backoff, bulkClient.threadPool()); retry.withBackoff(bulkClient::bulk, bulkRequest, listener, bulkClient.settings()); listener.awaitCallbacksCalled(); From b830333db843002e02a45657daef4b4a5b07f7f2 Mon Sep 17 00:00:00 2001 From: Yu Date: Tue, 17 Apr 2018 22:17:54 +0200 Subject: [PATCH 3/5] small changes --- .../src/main/java/org/elasticsearch/action/bulk/Retry.java | 2 +- .../elasticsearch/action/bulk/BulkProcessorRetryIT.java | 7 +++---- 2 files changed, 4 insertions(+), 5 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/action/bulk/Retry.java b/server/src/main/java/org/elasticsearch/action/bulk/Retry.java index bee712cf33833..75a1a2d5f8daa 100644 --- a/server/src/main/java/org/elasticsearch/action/bulk/Retry.java +++ b/server/src/main/java/org/elasticsearch/action/bulk/Retry.java @@ -158,7 +158,7 @@ private boolean canRetry(BulkResponse bulkItemResponses) { for (BulkItemResponse bulkItemResponse : bulkItemResponses) { if (bulkItemResponse.isFailed()) { final RestStatus status = bulkItemResponse.status(); - if (status == RETRY_STATUS) { + if (status != RETRY_STATUS) { return false; } } diff --git a/server/src/test/java/org/elasticsearch/action/bulk/BulkProcessorRetryIT.java b/server/src/test/java/org/elasticsearch/action/bulk/BulkProcessorRetryIT.java index 5d4dc6d3e0fdb..2f61be9707320 100644 --- a/server/src/test/java/org/elasticsearch/action/bulk/BulkProcessorRetryIT.java +++ b/server/src/test/java/org/elasticsearch/action/bulk/BulkProcessorRetryIT.java @@ -18,7 +18,6 @@ */ package org.elasticsearch.action.bulk; -import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.action.admin.indices.refresh.RefreshRequest; import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.common.settings.Settings; @@ -115,13 +114,13 @@ public void afterBulk(long executionId, BulkRequest request, Throwable failure) if (bulkItemResponse.isFailed()) { BulkItemResponse.Failure failure = bulkItemResponse.getFailure(); if (failure.getStatus() == RestStatus.TOO_MANY_REQUESTS) { - if (!rejectedExecutionExpected) { - Throwable rootCause = ExceptionsHelper.unwrapCause(failure.getCause()); + if (rejectedExecutionExpected == false) { Iterator backoffState = internalPolicy.backoffStateFor(bulkResponse); assertNotNull("backoffState is null (indicates a bulk request got rejected without retry)", backoffState); if (backoffState.hasNext()) { // we're not expecting that we overwhelmed it even once when we maxed out the number of retries - throw new AssertionError("Got rejected although backoff policy would allow more retries", rootCause); + throw new AssertionError("Got rejected although backoff policy would allow more retries", + failure.getCause()); } else { rejectedAfterAllRetries = true; logger.debug("We maxed out the number of bulk retries and got rejected (this is ok)."); From 8a664496717396067ae4648494279d5b2901d2be Mon Sep 17 00:00:00 2001 From: Yu Date: Tue, 17 Apr 2018 22:27:48 +0200 Subject: [PATCH 4/5] still a change --- .../org/elasticsearch/action/bulk/BulkProcessorRetryIT.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server/src/test/java/org/elasticsearch/action/bulk/BulkProcessorRetryIT.java b/server/src/test/java/org/elasticsearch/action/bulk/BulkProcessorRetryIT.java index 2f61be9707320..02ecb311629cf 100644 --- a/server/src/test/java/org/elasticsearch/action/bulk/BulkProcessorRetryIT.java +++ b/server/src/test/java/org/elasticsearch/action/bulk/BulkProcessorRetryIT.java @@ -150,7 +150,7 @@ public void afterBulk(long executionId, BulkRequest request, Throwable failure) if (rejectedAfterAllRetries) { assertThat((int) results.getHits().getTotalHits(), lessThan(numberOfAsyncOps)); } else { - assertThat(results.getHits().getTotalHits(), equalTo(numberOfAsyncOps)); + assertThat((int) results.getHits().getTotalHits(), equalTo(numberOfAsyncOps)); } } From 4223fadd2843bded1c6c30d371ae4fff10ec8c50 Mon Sep 17 00:00:00 2001 From: Yu Date: Sat, 21 Apr 2018 17:36:18 +0200 Subject: [PATCH 5/5] Address a test failure --- .../java/org/elasticsearch/client/BulkProcessorRetryIT.java | 5 ++++- .../org/elasticsearch/action/bulk/BulkProcessorRetryIT.java | 5 ++++- 2 files changed, 8 insertions(+), 2 deletions(-) diff --git a/client/rest-high-level/src/test/java/org/elasticsearch/client/BulkProcessorRetryIT.java b/client/rest-high-level/src/test/java/org/elasticsearch/client/BulkProcessorRetryIT.java index 417116eaf6950..597d35a99967b 100644 --- a/client/rest-high-level/src/test/java/org/elasticsearch/client/BulkProcessorRetryIT.java +++ b/client/rest-high-level/src/test/java/org/elasticsearch/client/BulkProcessorRetryIT.java @@ -40,6 +40,7 @@ import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.lessThan; +import static org.hamcrest.Matchers.lessThanOrEqualTo; public class BulkProcessorRetryIT extends ESRestHighLevelClientTestCase { @@ -129,7 +130,9 @@ public void afterBulk(long executionId, BulkRequest request, Throwable failure) highLevelClient().indices().refresh(new RefreshRequest()); int multiGetResponsesCount = highLevelClient().multiGet(multiGetRequest).getResponses().length; - if (rejectedAfterAllRetries) { + if (rejectedExecutionExpected) { + assertThat(multiGetResponsesCount, lessThanOrEqualTo(numberOfAsyncOps)); + } else if (rejectedAfterAllRetries) { assertThat(multiGetResponsesCount, lessThan(numberOfAsyncOps)); } else { assertThat(multiGetResponsesCount, equalTo(numberOfAsyncOps)); diff --git a/server/src/test/java/org/elasticsearch/action/bulk/BulkProcessorRetryIT.java b/server/src/test/java/org/elasticsearch/action/bulk/BulkProcessorRetryIT.java index 3bd53dbf13baf..f1731083ae376 100644 --- a/server/src/test/java/org/elasticsearch/action/bulk/BulkProcessorRetryIT.java +++ b/server/src/test/java/org/elasticsearch/action/bulk/BulkProcessorRetryIT.java @@ -37,6 +37,7 @@ import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.lessThan; +import static org.hamcrest.Matchers.lessThanOrEqualTo; @ESIntegTestCase.ClusterScope(scope = ESIntegTestCase.Scope.SUITE, numDataNodes = 2) public class BulkProcessorRetryIT extends ESIntegTestCase { @@ -147,7 +148,9 @@ public void afterBulk(long executionId, BulkRequest request, Throwable failure) .setSize(0) .get(); - if (rejectedAfterAllRetries) { + if (rejectedExecutionExpected) { + assertThat((int) results.getHits().getTotalHits(), lessThanOrEqualTo(numberOfAsyncOps)); + } else if (rejectedAfterAllRetries) { assertThat((int) results.getHits().getTotalHits(), lessThan(numberOfAsyncOps)); } else { assertThat((int) results.getHits().getTotalHits(), equalTo(numberOfAsyncOps));