Skip to content

Commit ffaac5a

Browse files
authored
Simplify BulkProcessor handling and retry logic (#24051)
This commit collapses the SyncBulkRequestHandler and AsyncBulkRequestHandler into a single BulkRequestHandler. The new handler executes a bulk request and awaits for the completion if the BulkProcessor was configured with a concurrentRequests setting of 0. Otherwise the execution happens asynchronously. As part of this change the Retry class has been refactored. withSyncBackoff and withAsyncBackoff have been replaced with two versions of withBackoff. One method takes a listener that will be called on completion. The other method returns a future that will been complete on request completion.
1 parent 99e0268 commit ffaac5a

File tree

6 files changed

+98
-204
lines changed

6 files changed

+98
-204
lines changed

core/src/main/java/org/elasticsearch/action/bulk/BulkProcessor.java

Lines changed: 1 addition & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -184,12 +184,7 @@ public static Builder builder(Client client, Listener listener) {
184184
this.bulkActions = bulkActions;
185185
this.bulkSize = bulkSize.getBytes();
186186
this.bulkRequest = new BulkRequest();
187-
188-
if (concurrentRequests == 0) {
189-
this.bulkRequestHandler = BulkRequestHandler.syncHandler(consumer, backoffPolicy, listener, threadPool);
190-
} else {
191-
this.bulkRequestHandler = BulkRequestHandler.asyncHandler(consumer, backoffPolicy, listener, threadPool, concurrentRequests);
192-
}
187+
this.bulkRequestHandler = new BulkRequestHandler(consumer, backoffPolicy, listener, threadPool, concurrentRequests);
193188

194189
// Start period flushing task after everything is setup
195190
this.cancellableFlushTask = startFlushTask(flushInterval, threadPool);

core/src/main/java/org/elasticsearch/action/bulk/BulkRequestHandler.java

Lines changed: 64 additions & 133 deletions
Original file line numberDiff line numberDiff line change
@@ -27,155 +27,86 @@
2727
import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
2828
import org.elasticsearch.threadpool.ThreadPool;
2929

30+
import java.util.concurrent.CountDownLatch;
3031
import java.util.concurrent.Semaphore;
3132
import java.util.concurrent.TimeUnit;
3233
import java.util.function.BiConsumer;
3334

3435
/**
35-
* Abstracts the low-level details of bulk request handling
36+
* Implements the low-level details of bulk request handling
3637
*/
37-
abstract class BulkRequestHandler {
38-
protected final Logger logger;
39-
protected final BiConsumer<BulkRequest, ActionListener<BulkResponse>> consumer;
40-
protected final ThreadPool threadPool;
41-
42-
protected BulkRequestHandler(BiConsumer<BulkRequest, ActionListener<BulkResponse>> consumer, ThreadPool threadPool) {
38+
public final class BulkRequestHandler {
39+
private final Logger logger;
40+
private final BiConsumer<BulkRequest, ActionListener<BulkResponse>> consumer;
41+
private final BulkProcessor.Listener listener;
42+
private final Semaphore semaphore;
43+
private final Retry retry;
44+
private final int concurrentRequests;
45+
46+
BulkRequestHandler(BiConsumer<BulkRequest, ActionListener<BulkResponse>> consumer, BackoffPolicy backoffPolicy,
47+
BulkProcessor.Listener listener, ThreadPool threadPool,
48+
int concurrentRequests) {
49+
assert concurrentRequests >= 0;
4350
this.logger = Loggers.getLogger(getClass());
4451
this.consumer = consumer;
45-
this.threadPool = threadPool;
46-
}
47-
48-
49-
public abstract void execute(BulkRequest bulkRequest, long executionId);
50-
51-
public abstract boolean awaitClose(long timeout, TimeUnit unit) throws InterruptedException;
52-
53-
54-
public static BulkRequestHandler syncHandler(BiConsumer<BulkRequest, ActionListener<BulkResponse>> consumer,
55-
BackoffPolicy backoffPolicy, BulkProcessor.Listener listener,
56-
ThreadPool threadPool) {
57-
return new SyncBulkRequestHandler(consumer, backoffPolicy, listener, threadPool);
52+
this.listener = listener;
53+
this.concurrentRequests = concurrentRequests;
54+
this.retry = new Retry(EsRejectedExecutionException.class, backoffPolicy, threadPool);
55+
this.semaphore = new Semaphore(concurrentRequests > 0 ? concurrentRequests : 1);
5856
}
5957

60-
public static BulkRequestHandler asyncHandler(BiConsumer<BulkRequest, ActionListener<BulkResponse>> consumer,
61-
BackoffPolicy backoffPolicy, BulkProcessor.Listener listener,
62-
ThreadPool threadPool, int concurrentRequests) {
63-
return new AsyncBulkRequestHandler(consumer, backoffPolicy, listener, threadPool, concurrentRequests);
64-
}
65-
66-
private static class SyncBulkRequestHandler extends BulkRequestHandler {
67-
private final BulkProcessor.Listener listener;
68-
private final BackoffPolicy backoffPolicy;
69-
70-
SyncBulkRequestHandler(BiConsumer<BulkRequest, ActionListener<BulkResponse>> consumer, BackoffPolicy backoffPolicy,
71-
BulkProcessor.Listener listener, ThreadPool threadPool) {
72-
super(consumer, threadPool);
73-
this.backoffPolicy = backoffPolicy;
74-
this.listener = listener;
75-
}
76-
77-
@Override
78-
public void execute(BulkRequest bulkRequest, long executionId) {
79-
boolean afterCalled = false;
80-
try {
81-
listener.beforeBulk(executionId, bulkRequest);
82-
BulkResponse bulkResponse = Retry
83-
.on(EsRejectedExecutionException.class)
84-
.policy(backoffPolicy)
85-
.using(threadPool)
86-
.withSyncBackoff(consumer, bulkRequest, Settings.EMPTY);
87-
afterCalled = true;
88-
listener.afterBulk(executionId, bulkRequest, bulkResponse);
89-
} catch (InterruptedException e) {
90-
Thread.currentThread().interrupt();
91-
logger.info((Supplier<?>) () -> new ParameterizedMessage("Bulk request {} has been cancelled.", executionId), e);
92-
if (!afterCalled) {
93-
listener.afterBulk(executionId, bulkRequest, e);
94-
}
95-
} catch (Exception e) {
96-
logger.warn((Supplier<?>) () -> new ParameterizedMessage("Failed to execute bulk request {}.", executionId), e);
97-
if (!afterCalled) {
98-
listener.afterBulk(executionId, bulkRequest, e);
58+
public void execute(BulkRequest bulkRequest, long executionId) {
59+
Runnable toRelease = () -> {};
60+
boolean bulkRequestSetupSuccessful = false;
61+
try {
62+
listener.beforeBulk(executionId, bulkRequest);
63+
semaphore.acquire();
64+
toRelease = semaphore::release;
65+
CountDownLatch latch = new CountDownLatch(1);
66+
retry.withBackoff(consumer, bulkRequest, new ActionListener<BulkResponse>() {
67+
@Override
68+
public void onResponse(BulkResponse response) {
69+
try {
70+
listener.afterBulk(executionId, bulkRequest, response);
71+
} finally {
72+
semaphore.release();
73+
latch.countDown();
74+
}
9975
}
100-
}
101-
}
10276

103-
@Override
104-
public boolean awaitClose(long timeout, TimeUnit unit) throws InterruptedException {
105-
// we are "closed" immediately as there is no request in flight
106-
return true;
107-
}
108-
}
109-
110-
private static class AsyncBulkRequestHandler extends BulkRequestHandler {
111-
private final BackoffPolicy backoffPolicy;
112-
private final BulkProcessor.Listener listener;
113-
private final Semaphore semaphore;
114-
private final int concurrentRequests;
115-
116-
private AsyncBulkRequestHandler(BiConsumer<BulkRequest, ActionListener<BulkResponse>> consumer, BackoffPolicy backoffPolicy,
117-
BulkProcessor.Listener listener, ThreadPool threadPool,
118-
int concurrentRequests) {
119-
super(consumer, threadPool);
120-
this.backoffPolicy = backoffPolicy;
121-
assert concurrentRequests > 0;
122-
this.listener = listener;
123-
this.concurrentRequests = concurrentRequests;
124-
this.semaphore = new Semaphore(concurrentRequests);
125-
}
126-
127-
@Override
128-
public void execute(BulkRequest bulkRequest, long executionId) {
129-
boolean bulkRequestSetupSuccessful = false;
130-
boolean acquired = false;
131-
try {
132-
listener.beforeBulk(executionId, bulkRequest);
133-
semaphore.acquire();
134-
acquired = true;
135-
Retry.on(EsRejectedExecutionException.class)
136-
.policy(backoffPolicy)
137-
.using(threadPool)
138-
.withAsyncBackoff(consumer, bulkRequest, new ActionListener<BulkResponse>() {
139-
@Override
140-
public void onResponse(BulkResponse response) {
141-
try {
142-
listener.afterBulk(executionId, bulkRequest, response);
143-
} finally {
144-
semaphore.release();
145-
}
146-
}
147-
148-
@Override
149-
public void onFailure(Exception e) {
150-
try {
151-
listener.afterBulk(executionId, bulkRequest, e);
152-
} finally {
153-
semaphore.release();
154-
}
155-
}
156-
}, Settings.EMPTY);
157-
bulkRequestSetupSuccessful = true;
158-
} catch (InterruptedException e) {
159-
Thread.currentThread().interrupt();
160-
logger.info((Supplier<?>) () -> new ParameterizedMessage("Bulk request {} has been cancelled.", executionId), e);
161-
listener.afterBulk(executionId, bulkRequest, e);
162-
} catch (Exception e) {
163-
logger.warn((Supplier<?>) () -> new ParameterizedMessage("Failed to execute bulk request {}.", executionId), e);
164-
listener.afterBulk(executionId, bulkRequest, e);
165-
} finally {
166-
if (!bulkRequestSetupSuccessful && acquired) { // if we fail on client.bulk() release the semaphore
167-
semaphore.release();
77+
@Override
78+
public void onFailure(Exception e) {
79+
try {
80+
listener.afterBulk(executionId, bulkRequest, e);
81+
} finally {
82+
semaphore.release();
83+
latch.countDown();
84+
}
16885
}
86+
}, Settings.EMPTY);
87+
bulkRequestSetupSuccessful = true;
88+
if (concurrentRequests == 0) {
89+
latch.await();
90+
}
91+
} catch (InterruptedException e) {
92+
Thread.currentThread().interrupt();
93+
logger.info((Supplier<?>) () -> new ParameterizedMessage("Bulk request {} has been cancelled.", executionId), e);
94+
listener.afterBulk(executionId, bulkRequest, e);
95+
} catch (Exception e) {
96+
logger.warn((Supplier<?>) () -> new ParameterizedMessage("Failed to execute bulk request {}.", executionId), e);
97+
listener.afterBulk(executionId, bulkRequest, e);
98+
} finally {
99+
if (bulkRequestSetupSuccessful == false) { // if we fail on client.bulk() release the semaphore
100+
toRelease.run();
169101
}
170102
}
103+
}
171104

172-
@Override
173-
public boolean awaitClose(long timeout, TimeUnit unit) throws InterruptedException {
174-
if (semaphore.tryAcquire(this.concurrentRequests, timeout, unit)) {
175-
semaphore.release(this.concurrentRequests);
176-
return true;
177-
}
178-
return false;
105+
boolean awaitClose(long timeout, TimeUnit unit) throws InterruptedException {
106+
if (semaphore.tryAcquire(this.concurrentRequests, timeout, unit)) {
107+
semaphore.release(this.concurrentRequests);
108+
return true;
179109
}
110+
return false;
180111
}
181112
}

core/src/main/java/org/elasticsearch/action/bulk/Retry.java

Lines changed: 13 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -25,83 +25,57 @@
2525
import org.elasticsearch.common.logging.Loggers;
2626
import org.elasticsearch.common.settings.Settings;
2727
import org.elasticsearch.common.unit.TimeValue;
28-
import org.elasticsearch.common.util.concurrent.EsExecutors;
2928
import org.elasticsearch.common.util.concurrent.FutureUtils;
3029
import org.elasticsearch.threadpool.ThreadPool;
3130

3231
import java.util.ArrayList;
3332
import java.util.Iterator;
3433
import java.util.List;
35-
import java.util.concurrent.Executors;
36-
import java.util.concurrent.ScheduledExecutorService;
3734
import java.util.concurrent.ScheduledFuture;
38-
import java.util.concurrent.ScheduledThreadPoolExecutor;
39-
import java.util.concurrent.TimeUnit;
4035
import java.util.function.BiConsumer;
41-
import java.util.function.BiFunction;
4236
import java.util.function.Predicate;
4337

4438
/**
4539
* Encapsulates synchronous and asynchronous retry logic.
4640
*/
4741
public class Retry {
4842
private final Class<? extends Throwable> retryOnThrowable;
43+
private final BackoffPolicy backoffPolicy;
44+
private final ThreadPool threadPool;
4945

50-
private BackoffPolicy backoffPolicy;
51-
private ThreadPool threadPool;
5246

53-
public static Retry on(Class<? extends Throwable> retryOnThrowable) {
54-
return new Retry(retryOnThrowable);
55-
}
56-
57-
Retry(Class<? extends Throwable> retryOnThrowable) {
47+
public Retry(Class<? extends Throwable> retryOnThrowable, BackoffPolicy backoffPolicy, ThreadPool threadPool) {
5848
this.retryOnThrowable = retryOnThrowable;
59-
}
60-
61-
/**
62-
* @param backoffPolicy The backoff policy that defines how long and how often to wait for retries.
63-
*/
64-
public Retry policy(BackoffPolicy backoffPolicy) {
6549
this.backoffPolicy = backoffPolicy;
66-
return this;
67-
}
68-
69-
/**
70-
* @param threadPool The threadPool that will be used to schedule retries.
71-
*/
72-
public Retry using(ThreadPool threadPool) {
7350
this.threadPool = threadPool;
74-
return this;
7551
}
7652

7753
/**
78-
* Invokes #apply(BulkRequest, ActionListener). Backs off on the provided exception and delegates results to the
79-
* provided listener. Retries will be attempted using the provided schedule function
54+
* Invokes #accept(BulkRequest, ActionListener). Backs off on the provided exception and delegates results to the
55+
* provided listener. Retries will be scheduled using the class's thread pool.
8056
* @param consumer The consumer to which apply the request and listener
8157
* @param bulkRequest The bulk request that should be executed.
8258
* @param listener A listener that is invoked when the bulk request finishes or completes with an exception. The listener is not
8359
* @param settings settings
8460
*/
85-
public void withAsyncBackoff(BiConsumer<BulkRequest, ActionListener<BulkResponse>> consumer, BulkRequest bulkRequest, ActionListener<BulkResponse> listener, Settings settings) {
61+
public void withBackoff(BiConsumer<BulkRequest, ActionListener<BulkResponse>> consumer, BulkRequest bulkRequest, ActionListener<BulkResponse> listener, Settings settings) {
8662
RetryHandler r = new RetryHandler(retryOnThrowable, backoffPolicy, consumer, listener, settings, threadPool);
8763
r.execute(bulkRequest);
8864
}
8965

9066
/**
91-
* Invokes #apply(BulkRequest, ActionListener). Backs off on the provided exception. Retries will be attempted using
92-
* the provided schedule function.
67+
* Invokes #accept(BulkRequest, ActionListener). Backs off on the provided exception. Retries will be scheduled using
68+
* the class's thread pool.
9369
*
9470
* @param consumer The consumer to which apply the request and listener
9571
* @param bulkRequest The bulk request that should be executed.
9672
* @param settings settings
97-
* @return the bulk response as returned by the client.
98-
* @throws Exception Any exception thrown by the callable.
73+
* @return a future representing the bulk response returned by the client.
9974
*/
100-
public BulkResponse withSyncBackoff(BiConsumer<BulkRequest, ActionListener<BulkResponse>> consumer, BulkRequest bulkRequest, Settings settings) throws Exception {
101-
PlainActionFuture<BulkResponse> actionFuture = PlainActionFuture.newFuture();
102-
RetryHandler r = new RetryHandler(retryOnThrowable, backoffPolicy, consumer, actionFuture, settings, threadPool);
103-
r.execute(bulkRequest);
104-
return actionFuture.actionGet();
75+
public PlainActionFuture<BulkResponse> withBackoff(BiConsumer<BulkRequest, ActionListener<BulkResponse>> consumer, BulkRequest bulkRequest, Settings settings) {
76+
PlainActionFuture<BulkResponse> future = PlainActionFuture.newFuture();
77+
withBackoff(consumer, bulkRequest, future, settings);
78+
return future;
10579
}
10680

10781
static class RetryHandler implements ActionListener<BulkResponse> {

core/src/main/java/org/elasticsearch/action/bulk/byscroll/AbstractAsyncBulkByScrollAction.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -134,7 +134,7 @@ public AbstractAsyncBulkByScrollAction(WorkingBulkByScrollTask task, Logger logg
134134
this.mainRequest = mainRequest;
135135
this.listener = listener;
136136
BackoffPolicy backoffPolicy = buildBackoffPolicy();
137-
bulkRetry = Retry.on(EsRejectedExecutionException.class).policy(BackoffPolicy.wrap(backoffPolicy, task::countBulkRetry)).using(threadPool);
137+
bulkRetry = new Retry(EsRejectedExecutionException.class, BackoffPolicy.wrap(backoffPolicy, task::countBulkRetry), threadPool);
138138
scrollSource = buildScrollableResultSource(backoffPolicy);
139139
scriptApplier = Objects.requireNonNull(buildScriptApplier(), "script applier must not be null");
140140
/*
@@ -337,7 +337,7 @@ void sendBulkRequest(TimeValue thisBatchStartTime, BulkRequest request) {
337337
finishHim(null);
338338
return;
339339
}
340-
bulkRetry.withAsyncBackoff(client::bulk, request, new ActionListener<BulkResponse>() {
340+
bulkRetry.withBackoff(client::bulk, request, new ActionListener<BulkResponse>() {
341341
@Override
342342
public void onResponse(BulkResponse response) {
343343
onBulkResponse(thisBatchStartTime, response);

0 commit comments

Comments
 (0)