Skip to content

Commit d72cc89

Browse files
PnPiejavanna
authored andcommitted
BulkProcessor to retry based on status code (#29329)
Previously `BulkProcessor` retry logic was based on the exception type of the failed response (`EsRejectedExecutionException`). This commit changes it to be based on the returned status code. This allows us to reproduce the same retry behaviour when the `BulkProcessor` is used from the high-level REST client, which was previously not the case as we cannot rebuild the same exception type when parsing back the response. This change has no effect on the transport client. Closes #28885
1 parent bbde5b7 commit d72cc89

File tree

7 files changed

+253
-37
lines changed

7 files changed

+253
-37
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,219 @@
1+
/*
2+
* Licensed to Elasticsearch under one or more contributor
3+
* license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright
5+
* ownership. Elasticsearch licenses this file to you under
6+
* the Apache License, Version 2.0 (the "License"); you may
7+
* not use this file except in compliance with the License.
8+
* You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
package org.elasticsearch.client;
20+
21+
import org.elasticsearch.action.admin.indices.refresh.RefreshRequest;
22+
import org.elasticsearch.action.bulk.BackoffPolicy;
23+
import org.elasticsearch.action.bulk.BulkItemResponse;
24+
import org.elasticsearch.action.bulk.BulkProcessor;
25+
import org.elasticsearch.action.bulk.BulkRequest;
26+
import org.elasticsearch.action.bulk.BulkResponse;
27+
import org.elasticsearch.action.get.MultiGetRequest;
28+
import org.elasticsearch.action.index.IndexRequest;
29+
import org.elasticsearch.common.unit.TimeValue;
30+
import org.elasticsearch.common.xcontent.XContentType;
31+
import org.elasticsearch.rest.RestStatus;
32+
33+
import java.util.Collections;
34+
import java.util.Iterator;
35+
import java.util.Map;
36+
import java.util.Set;
37+
import java.util.concurrent.ConcurrentHashMap;
38+
import java.util.concurrent.CountDownLatch;
39+
import java.util.concurrent.TimeUnit;
40+
41+
import static org.hamcrest.Matchers.equalTo;
42+
import static org.hamcrest.Matchers.lessThan;
43+
import static org.hamcrest.Matchers.lessThanOrEqualTo;
44+
45+
public class BulkProcessorRetryIT extends ESRestHighLevelClientTestCase {
46+
47+
private static final String INDEX_NAME = "index";
48+
private static final String TYPE_NAME = "type";
49+
50+
private static BulkProcessor.Builder initBulkProcessorBuilder(BulkProcessor.Listener listener) {
51+
return BulkProcessor.builder(highLevelClient()::bulkAsync, listener);
52+
}
53+
54+
public void testBulkRejectionLoadWithoutBackoff() throws Exception {
55+
boolean rejectedExecutionExpected = true;
56+
executeBulkRejectionLoad(BackoffPolicy.noBackoff(), rejectedExecutionExpected);
57+
}
58+
59+
public void testBulkRejectionLoadWithBackoff() throws Throwable {
60+
boolean rejectedExecutionExpected = false;
61+
executeBulkRejectionLoad(BackoffPolicy.exponentialBackoff(), rejectedExecutionExpected);
62+
}
63+
64+
private void executeBulkRejectionLoad(BackoffPolicy backoffPolicy, boolean rejectedExecutionExpected) throws Exception {
65+
final CorrelatingBackoffPolicy internalPolicy = new CorrelatingBackoffPolicy(backoffPolicy);
66+
final int numberOfAsyncOps = randomIntBetween(600, 700);
67+
final CountDownLatch latch = new CountDownLatch(numberOfAsyncOps);
68+
final Set<Object> responses = Collections.newSetFromMap(new ConcurrentHashMap<>());
69+
70+
BulkProcessor bulkProcessor = initBulkProcessorBuilder(new BulkProcessor.Listener() {
71+
@Override
72+
public void beforeBulk(long executionId, BulkRequest request) {
73+
}
74+
75+
@Override
76+
public void afterBulk(long executionId, BulkRequest request, BulkResponse response) {
77+
internalPolicy.logResponse(response);
78+
responses.add(response);
79+
latch.countDown();
80+
}
81+
82+
@Override
83+
public void afterBulk(long executionId, BulkRequest request, Throwable failure) {
84+
responses.add(failure);
85+
latch.countDown();
86+
}
87+
}).setBulkActions(1)
88+
.setConcurrentRequests(randomIntBetween(0, 100))
89+
.setBackoffPolicy(internalPolicy)
90+
.build();
91+
92+
MultiGetRequest multiGetRequest = indexDocs(bulkProcessor, numberOfAsyncOps);
93+
latch.await(10, TimeUnit.SECONDS);
94+
bulkProcessor.close();
95+
96+
assertEquals(responses.size(), numberOfAsyncOps);
97+
98+
boolean rejectedAfterAllRetries = false;
99+
for (Object response : responses) {
100+
if (response instanceof BulkResponse) {
101+
BulkResponse bulkResponse = (BulkResponse) response;
102+
for (BulkItemResponse bulkItemResponse : bulkResponse.getItems()) {
103+
if (bulkItemResponse.isFailed()) {
104+
BulkItemResponse.Failure failure = bulkItemResponse.getFailure();
105+
if (failure.getStatus() == RestStatus.TOO_MANY_REQUESTS) {
106+
if (rejectedExecutionExpected == false) {
107+
Iterator<TimeValue> backoffState = internalPolicy.backoffStateFor(bulkResponse);
108+
assertNotNull("backoffState is null (indicates a bulk request got rejected without retry)", backoffState);
109+
if (backoffState.hasNext()) {
110+
// we're not expecting that we overwhelmed it even once when we maxed out the number of retries
111+
throw new AssertionError("Got rejected although backoff policy would allow more retries",
112+
failure.getCause());
113+
} else {
114+
rejectedAfterAllRetries = true;
115+
logger.debug("We maxed out the number of bulk retries and got rejected (this is ok).");
116+
}
117+
}
118+
} else {
119+
throw new AssertionError("Unexpected failure with status: " + failure.getStatus());
120+
}
121+
}
122+
}
123+
} else {
124+
Throwable t = (Throwable) response;
125+
// we're not expecting any other errors
126+
throw new AssertionError("Unexpected failure", t);
127+
}
128+
}
129+
130+
highLevelClient().indices().refresh(new RefreshRequest());
131+
int multiGetResponsesCount = highLevelClient().multiGet(multiGetRequest).getResponses().length;
132+
133+
if (rejectedExecutionExpected) {
134+
assertThat(multiGetResponsesCount, lessThanOrEqualTo(numberOfAsyncOps));
135+
} else if (rejectedAfterAllRetries) {
136+
assertThat(multiGetResponsesCount, lessThan(numberOfAsyncOps));
137+
} else {
138+
assertThat(multiGetResponsesCount, equalTo(numberOfAsyncOps));
139+
}
140+
141+
}
142+
143+
private static MultiGetRequest indexDocs(BulkProcessor processor, int numDocs) {
144+
MultiGetRequest multiGetRequest = new MultiGetRequest();
145+
for (int i = 1; i <= numDocs; i++) {
146+
processor.add(new IndexRequest(INDEX_NAME, TYPE_NAME, Integer.toString(i))
147+
.source(XContentType.JSON, "field", randomRealisticUnicodeOfCodepointLengthBetween(1, 30)));
148+
multiGetRequest.add(INDEX_NAME, TYPE_NAME, Integer.toString(i));
149+
}
150+
return multiGetRequest;
151+
}
152+
153+
/**
154+
* Internal helper class to correlate backoff states with bulk responses. This is needed to check whether we maxed out the number
155+
* of retries but still got rejected (which is perfectly fine and can also happen from time to time under heavy load).
156+
*
157+
* This implementation relies on an implementation detail in Retry, namely that the bulk listener is notified on the same thread
158+
* as the last call to the backoff policy's iterator. The advantage is that this is non-invasive to the rest of the production code.
159+
*/
160+
private static class CorrelatingBackoffPolicy extends BackoffPolicy {
161+
private final Map<BulkResponse, Iterator<TimeValue>> correlations = new ConcurrentHashMap<>();
162+
// this is intentionally *not* static final. We will only ever have one instance of this class per test case and want the
163+
// thread local to be eligible for garbage collection right after the test to avoid leaks.
164+
private final ThreadLocal<Iterator<TimeValue>> iterators = new ThreadLocal<>();
165+
166+
private final BackoffPolicy delegate;
167+
168+
private CorrelatingBackoffPolicy(BackoffPolicy delegate) {
169+
this.delegate = delegate;
170+
}
171+
172+
public Iterator<TimeValue> backoffStateFor(BulkResponse response) {
173+
return correlations.get(response);
174+
}
175+
176+
// Assumption: This method is called from the same thread as the last call to the internal iterator's #hasNext() / #next()
177+
// see also Retry.AbstractRetryHandler#onResponse().
178+
public void logResponse(BulkResponse response) {
179+
Iterator<TimeValue> iterator = iterators.get();
180+
// did we ever retry?
181+
if (iterator != null) {
182+
// we should correlate any iterator only once
183+
iterators.remove();
184+
correlations.put(response, iterator);
185+
}
186+
}
187+
188+
@Override
189+
public Iterator<TimeValue> iterator() {
190+
return new CorrelatingIterator(iterators, delegate.iterator());
191+
}
192+
193+
private static class CorrelatingIterator implements Iterator<TimeValue> {
194+
private final Iterator<TimeValue> delegate;
195+
private final ThreadLocal<Iterator<TimeValue>> iterators;
196+
197+
private CorrelatingIterator(ThreadLocal<Iterator<TimeValue>> iterators, Iterator<TimeValue> delegate) {
198+
this.iterators = iterators;
199+
this.delegate = delegate;
200+
}
201+
202+
@Override
203+
public boolean hasNext() {
204+
// update on every invocation as we might get rescheduled on a different thread. Unfortunately, there is a chance that
205+
// we pollute the thread local map with stale values. Due to the implementation of Retry and the life cycle of the
206+
// enclosing class CorrelatingBackoffPolicy this should not pose a major problem though.
207+
iterators.set(this);
208+
return delegate.hasNext();
209+
}
210+
211+
@Override
212+
public TimeValue next() {
213+
// update on every invocation
214+
iterators.set(this);
215+
return delegate.next();
216+
}
217+
}
218+
}
219+
}

modules/reindex/src/main/java/org/elasticsearch/index/reindex/AbstractAsyncBulkByScrollAction.java

+3-4
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,6 @@
3232
import org.elasticsearch.action.bulk.BulkRequest;
3333
import org.elasticsearch.action.bulk.BulkResponse;
3434
import org.elasticsearch.action.bulk.Retry;
35-
import org.elasticsearch.index.reindex.ScrollableHitSource.SearchFailure;
3635
import org.elasticsearch.action.delete.DeleteRequest;
3736
import org.elasticsearch.action.index.IndexRequest;
3837
import org.elasticsearch.client.ParentTaskAssigningClient;
@@ -41,7 +40,6 @@
4140
import org.elasticsearch.common.unit.ByteSizeValue;
4241
import org.elasticsearch.common.unit.TimeValue;
4342
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
44-
import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
4543
import org.elasticsearch.index.VersionType;
4644
import org.elasticsearch.index.mapper.IdFieldMapper;
4745
import org.elasticsearch.index.mapper.IndexFieldMapper;
@@ -50,6 +48,7 @@
5048
import org.elasticsearch.index.mapper.SourceFieldMapper;
5149
import org.elasticsearch.index.mapper.TypeFieldMapper;
5250
import org.elasticsearch.index.mapper.VersionFieldMapper;
51+
import org.elasticsearch.index.reindex.ScrollableHitSource.SearchFailure;
5352
import org.elasticsearch.script.ExecutableScript;
5453
import org.elasticsearch.script.Script;
5554
import org.elasticsearch.script.ScriptService;
@@ -76,8 +75,8 @@
7675
import static java.util.Collections.emptyList;
7776
import static java.util.Collections.unmodifiableList;
7877
import static org.elasticsearch.action.bulk.BackoffPolicy.exponentialBackoff;
79-
import static org.elasticsearch.index.reindex.AbstractBulkByScrollRequest.SIZE_ALL_MATCHES;
8078
import static org.elasticsearch.common.unit.TimeValue.timeValueNanos;
79+
import static org.elasticsearch.index.reindex.AbstractBulkByScrollRequest.SIZE_ALL_MATCHES;
8180
import static org.elasticsearch.rest.RestStatus.CONFLICT;
8281
import static org.elasticsearch.search.sort.SortBuilders.fieldSort;
8382

@@ -140,7 +139,7 @@ public AbstractAsyncBulkByScrollAction(BulkByScrollTask task, Logger logger, Par
140139
this.mainRequest = mainRequest;
141140
this.listener = listener;
142141
BackoffPolicy backoffPolicy = buildBackoffPolicy();
143-
bulkRetry = new Retry(EsRejectedExecutionException.class, BackoffPolicy.wrap(backoffPolicy, worker::countBulkRetry), threadPool);
142+
bulkRetry = new Retry(BackoffPolicy.wrap(backoffPolicy, worker::countBulkRetry), threadPool);
144143
scrollSource = buildScrollableResultSource(backoffPolicy);
145144
scriptApplier = Objects.requireNonNull(buildScriptApplier(), "script applier must not be null");
146145
/*

modules/reindex/src/test/java/org/elasticsearch/index/reindex/RetryTests.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -183,7 +183,7 @@ private void testCase(
183183
bulk.add(client().prepareIndex("source", "test").setSource("foo", "bar " + i));
184184
}
185185

186-
Retry retry = new Retry(EsRejectedExecutionException.class, BackoffPolicy.exponentialBackoff(), client().threadPool());
186+
Retry retry = new Retry(BackoffPolicy.exponentialBackoff(), client().threadPool());
187187
BulkResponse initialBulkResponse = retry.withBackoff(client()::bulk, bulk.request(), client().settings()).actionGet();
188188
assertFalse(initialBulkResponse.buildFailureMessage(), initialBulkResponse.hasFailures());
189189
client().admin().indices().prepareRefresh("source").get();

server/src/main/java/org/elasticsearch/action/bulk/BulkRequestHandler.java

+1-2
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,6 @@
2323
import org.elasticsearch.action.ActionListener;
2424
import org.elasticsearch.common.logging.Loggers;
2525
import org.elasticsearch.common.settings.Settings;
26-
import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
2726
import org.elasticsearch.threadpool.Scheduler;
2827

2928
import java.util.concurrent.CountDownLatch;
@@ -49,7 +48,7 @@ public final class BulkRequestHandler {
4948
this.consumer = consumer;
5049
this.listener = listener;
5150
this.concurrentRequests = concurrentRequests;
52-
this.retry = new Retry(EsRejectedExecutionException.class, backoffPolicy, scheduler);
51+
this.retry = new Retry(backoffPolicy, scheduler);
5352
this.semaphore = new Semaphore(concurrentRequests > 0 ? concurrentRequests : 1);
5453
}
5554

server/src/main/java/org/elasticsearch/action/bulk/Retry.java

+9-13
Original file line numberDiff line numberDiff line change
@@ -19,13 +19,13 @@
1919
package org.elasticsearch.action.bulk;
2020

2121
import org.apache.logging.log4j.Logger;
22-
import org.elasticsearch.ExceptionsHelper;
2322
import org.elasticsearch.action.ActionListener;
2423
import org.elasticsearch.action.support.PlainActionFuture;
2524
import org.elasticsearch.common.logging.Loggers;
2625
import org.elasticsearch.common.settings.Settings;
2726
import org.elasticsearch.common.unit.TimeValue;
2827
import org.elasticsearch.common.util.concurrent.FutureUtils;
28+
import org.elasticsearch.rest.RestStatus;
2929
import org.elasticsearch.threadpool.Scheduler;
3030
import org.elasticsearch.threadpool.ThreadPool;
3131

@@ -40,12 +40,10 @@
4040
* Encapsulates synchronous and asynchronous retry logic.
4141
*/
4242
public class Retry {
43-
private final Class<? extends Throwable> retryOnThrowable;
4443
private final BackoffPolicy backoffPolicy;
4544
private final Scheduler scheduler;
4645

47-
public Retry(Class<? extends Throwable> retryOnThrowable, BackoffPolicy backoffPolicy, Scheduler scheduler) {
48-
this.retryOnThrowable = retryOnThrowable;
46+
public Retry(BackoffPolicy backoffPolicy, Scheduler scheduler) {
4947
this.backoffPolicy = backoffPolicy;
5048
this.scheduler = scheduler;
5149
}
@@ -60,7 +58,7 @@ public Retry(Class<? extends Throwable> retryOnThrowable, BackoffPolicy backoffP
6058
*/
6159
public void withBackoff(BiConsumer<BulkRequest, ActionListener<BulkResponse>> consumer, BulkRequest bulkRequest,
6260
ActionListener<BulkResponse> listener, Settings settings) {
63-
RetryHandler r = new RetryHandler(retryOnThrowable, backoffPolicy, consumer, listener, settings, scheduler);
61+
RetryHandler r = new RetryHandler(backoffPolicy, consumer, listener, settings, scheduler);
6462
r.execute(bulkRequest);
6563
}
6664

@@ -81,12 +79,13 @@ public PlainActionFuture<BulkResponse> withBackoff(BiConsumer<BulkRequest, Actio
8179
}
8280

8381
static class RetryHandler implements ActionListener<BulkResponse> {
82+
private static final RestStatus RETRY_STATUS = RestStatus.TOO_MANY_REQUESTS;
83+
8484
private final Logger logger;
8585
private final Scheduler scheduler;
8686
private final BiConsumer<BulkRequest, ActionListener<BulkResponse>> consumer;
8787
private final ActionListener<BulkResponse> listener;
8888
private final Iterator<TimeValue> backoff;
89-
private final Class<? extends Throwable> retryOnThrowable;
9089
// Access only when holding a client-side lock, see also #addResponses()
9190
private final List<BulkItemResponse> responses = new ArrayList<>();
9291
private final long startTimestampNanos;
@@ -95,10 +94,8 @@ static class RetryHandler implements ActionListener<BulkResponse> {
9594
private volatile BulkRequest currentBulkRequest;
9695
private volatile ScheduledFuture<?> scheduledRequestFuture;
9796

98-
RetryHandler(Class<? extends Throwable> retryOnThrowable, BackoffPolicy backoffPolicy,
99-
BiConsumer<BulkRequest, ActionListener<BulkResponse>> consumer, ActionListener<BulkResponse> listener,
100-
Settings settings, Scheduler scheduler) {
101-
this.retryOnThrowable = retryOnThrowable;
97+
RetryHandler(BackoffPolicy backoffPolicy, BiConsumer<BulkRequest, ActionListener<BulkResponse>> consumer,
98+
ActionListener<BulkResponse> listener, Settings settings, Scheduler scheduler) {
10299
this.backoff = backoffPolicy.iterator();
103100
this.consumer = consumer;
104101
this.listener = listener;
@@ -160,9 +157,8 @@ private boolean canRetry(BulkResponse bulkItemResponses) {
160157
}
161158
for (BulkItemResponse bulkItemResponse : bulkItemResponses) {
162159
if (bulkItemResponse.isFailed()) {
163-
final Throwable cause = bulkItemResponse.getFailure().getCause();
164-
final Throwable rootCause = ExceptionsHelper.unwrapCause(cause);
165-
if (!rootCause.getClass().equals(retryOnThrowable)) {
160+
final RestStatus status = bulkItemResponse.status();
161+
if (status != RETRY_STATUS) {
166162
return false;
167163
}
168164
}

0 commit comments

Comments
 (0)