Skip to content

Commit c38079d

Browse files
authored
Prevent deadlock by using separate schedulers (#48697)
Currently the BulkProcessor class uses a single scheduler to schedule flushes and retries. Functionally these are very different concerns but can result in a dead lock. Specifically, the single shared scheduler can kick off a flush task, which only finishes it's task when the bulk that is being flushed finishes. If (for what ever reason), any items in that bulk fails it will (by default) schedule a retry. However, that retry will never run it's task, since the flush task is consuming the 1 and only thread available from the shared scheduler. Since the BulkProcessor is mostly client based code, the client can provide their own scheduler. As-is the scheduler would require at minimum 2 worker threads to avoid the potential deadlock. Since the number of threads is a configuration option in the scheduler, the code can not enforce this 2 worker rule until runtime. For this reason this commit splits the single task scheduler into 2 schedulers. This eliminates the potential for the flush task to block the retry task and removes this deadlock scenario. This commit also deprecates the Java APIs that presume a single scheduler, and updates any internal code to no longer use those APIs. Fixes #47599 Note - #41451 fixed the general case where a bulk fails and is retried that can result in a deadlock. This fix should address that case as well as the case when a bulk failure *from the flush* needs to be retried.
1 parent 95ee5b1 commit c38079d

File tree

7 files changed

+72
-22
lines changed

7 files changed

+72
-22
lines changed

server/src/main/java/org/elasticsearch/action/bulk/BulkProcessor.java

+60-11
Original file line numberDiff line numberDiff line change
@@ -84,7 +84,8 @@ public static class Builder {
8484

8585
private final BiConsumer<BulkRequest, ActionListener<BulkResponse>> consumer;
8686
private final Listener listener;
87-
private final Scheduler scheduler;
87+
private final Scheduler flushScheduler;
88+
private final Scheduler retryScheduler;
8889
private final Runnable onClose;
8990
private int concurrentRequests = 1;
9091
private int bulkActions = 1000;
@@ -96,10 +97,11 @@ public static class Builder {
9697
private String globalPipeline;
9798

9899
private Builder(BiConsumer<BulkRequest, ActionListener<BulkResponse>> consumer, Listener listener,
99-
Scheduler scheduler, Runnable onClose) {
100+
Scheduler flushScheduler, Scheduler retryScheduler, Runnable onClose) {
100101
this.consumer = consumer;
101102
this.listener = listener;
102-
this.scheduler = scheduler;
103+
this.flushScheduler = flushScheduler;
104+
this.retryScheduler = retryScheduler;
103105
this.onClose = onClose;
104106
}
105107

@@ -178,7 +180,7 @@ public Builder setBackoffPolicy(BackoffPolicy backoffPolicy) {
178180
*/
179181
public BulkProcessor build() {
180182
return new BulkProcessor(consumer, backoffPolicy, listener, concurrentRequests, bulkActions,
181-
bulkSize, flushInterval, scheduler, onClose, createBulkRequestWithGlobalDefaults());
183+
bulkSize, flushInterval, flushScheduler, retryScheduler, onClose, createBulkRequestWithGlobalDefaults());
182184
}
183185

184186
private Supplier<BulkRequest> createBulkRequestWithGlobalDefaults() {
@@ -188,19 +190,55 @@ private Supplier<BulkRequest> createBulkRequestWithGlobalDefaults() {
188190
}
189191
}
190192

193+
/**
194+
* @param client The client that executes the bulk operations
195+
* @param listener The BulkProcessor listener that gets called on bulk events
196+
* @param flushScheduler The scheduler that is used to flush
197+
* @param retryScheduler The scheduler that is used for retries
198+
* @param onClose The runnable instance that is executed on close. Consumers are required to clean up the schedulers.
199+
* @return the builder for BulkProcessor
200+
*/
201+
public static Builder builder(Client client, Listener listener, Scheduler flushScheduler, Scheduler retryScheduler, Runnable onClose) {
202+
Objects.requireNonNull(client, "client");
203+
Objects.requireNonNull(listener, "listener");
204+
return new Builder(client::bulk, listener, flushScheduler, retryScheduler, onClose);
205+
}
206+
207+
208+
/**
209+
* @param client The client that executes the bulk operations
210+
* @param listener The BulkProcessor listener that gets called on bulk events
211+
* @return the builder for BulkProcessor
212+
* @deprecated Use {@link #builder(java.util.function.BiConsumer, org.elasticsearch.action.bulk.BulkProcessor.Listener)}
213+
* with client::bulk as the first argument, or {@link #builder(org.elasticsearch.client.Client,
214+
* org.elasticsearch.action.bulk.BulkProcessor.Listener, org.elasticsearch.threadpool.Scheduler,
215+
* org.elasticsearch.threadpool.Scheduler, java.lang.Runnable)} and manage the flush and retry schedulers explicitly
216+
*/
217+
@Deprecated
191218
public static Builder builder(Client client, Listener listener) {
192219
Objects.requireNonNull(client, "client");
193220
Objects.requireNonNull(listener, "listener");
194-
return new Builder(client::bulk, listener, client.threadPool(), () -> {});
221+
return new Builder(client::bulk, listener, client.threadPool(), client.threadPool(), () -> {});
195222
}
196223

224+
/**
225+
* @param consumer The consumer that is called to fulfil bulk operations
226+
* @param listener The BulkProcessor listener that gets called on bulk events
227+
* @return the builder for BulkProcessor
228+
*/
197229
public static Builder builder(BiConsumer<BulkRequest, ActionListener<BulkResponse>> consumer, Listener listener) {
198230
Objects.requireNonNull(consumer, "consumer");
199231
Objects.requireNonNull(listener, "listener");
200-
final ScheduledThreadPoolExecutor scheduledThreadPoolExecutor = Scheduler.initScheduler(Settings.EMPTY);
232+
final ScheduledThreadPoolExecutor flushScheduledThreadPoolExecutor = Scheduler.initScheduler(Settings.EMPTY);
233+
final ScheduledThreadPoolExecutor retryScheduledThreadPoolExecutor = Scheduler.initScheduler(Settings.EMPTY);
201234
return new Builder(consumer, listener,
202-
buildScheduler(scheduledThreadPoolExecutor),
203-
() -> Scheduler.terminate(scheduledThreadPoolExecutor, 10, TimeUnit.SECONDS));
235+
buildScheduler(flushScheduledThreadPoolExecutor),
236+
buildScheduler(retryScheduledThreadPoolExecutor),
237+
() ->
238+
{
239+
Scheduler.terminate(flushScheduledThreadPoolExecutor, 10, TimeUnit.SECONDS);
240+
Scheduler.terminate(retryScheduledThreadPoolExecutor, 10, TimeUnit.SECONDS);
241+
});
204242
}
205243

206244
private static Scheduler buildScheduler(ScheduledThreadPoolExecutor scheduledThreadPoolExecutor) {
@@ -225,17 +263,28 @@ private static Scheduler buildScheduler(ScheduledThreadPoolExecutor scheduledThr
225263

226264
BulkProcessor(BiConsumer<BulkRequest, ActionListener<BulkResponse>> consumer, BackoffPolicy backoffPolicy, Listener listener,
227265
int concurrentRequests, int bulkActions, ByteSizeValue bulkSize, @Nullable TimeValue flushInterval,
228-
Scheduler scheduler, Runnable onClose, Supplier<BulkRequest> bulkRequestSupplier) {
266+
Scheduler flushScheduler, Scheduler retryScheduler, Runnable onClose, Supplier<BulkRequest> bulkRequestSupplier) {
229267
this.bulkActions = bulkActions;
230268
this.bulkSize = bulkSize.getBytes();
231269
this.bulkRequest = bulkRequestSupplier.get();
232270
this.bulkRequestSupplier = bulkRequestSupplier;
233-
this.bulkRequestHandler = new BulkRequestHandler(consumer, backoffPolicy, listener, scheduler, concurrentRequests);
271+
this.bulkRequestHandler = new BulkRequestHandler(consumer, backoffPolicy, listener, retryScheduler, concurrentRequests);
234272
// Start period flushing task after everything is setup
235-
this.cancellableFlushTask = startFlushTask(flushInterval, scheduler);
273+
this.cancellableFlushTask = startFlushTask(flushInterval, flushScheduler);
236274
this.onClose = onClose;
237275
}
238276

277+
/**
278+
* @deprecated use the {@link BulkProcessor} constructor which uses separate schedulers for flush and retry
279+
*/
280+
@Deprecated
281+
BulkProcessor(BiConsumer<BulkRequest, ActionListener<BulkResponse>> consumer, BackoffPolicy backoffPolicy, Listener listener,
282+
int concurrentRequests, int bulkActions, ByteSizeValue bulkSize, @Nullable TimeValue flushInterval,
283+
Scheduler scheduler, Runnable onClose, Supplier<BulkRequest> bulkRequestSupplier) {
284+
this(consumer, backoffPolicy, listener, concurrentRequests, bulkActions, bulkSize, flushInterval,
285+
scheduler, scheduler, onClose, bulkRequestSupplier );
286+
}
287+
239288
/**
240289
* Closes the processor. If flushing by time is enabled, then it's shutdown. Any remaining bulk actions are flushed.
241290
*/

server/src/test/java/org/elasticsearch/action/bulk/BulkProcessorIT.java

+5-5
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,7 @@ public void testThatBulkProcessorCountIsCorrect() throws Exception {
5757
BulkProcessorTestListener listener = new BulkProcessorTestListener(latch);
5858

5959
int numDocs = randomIntBetween(10, 100);
60-
try (BulkProcessor processor = BulkProcessor.builder(client(), listener)
60+
try (BulkProcessor processor = BulkProcessor.builder(client()::bulk, listener)
6161
//let's make sure that the bulk action limit trips, one single execution will index all the documents
6262
.setConcurrentRequests(randomIntBetween(0, 1)).setBulkActions(numDocs)
6363
.setFlushInterval(TimeValue.timeValueHours(24)).setBulkSize(new ByteSizeValue(1, ByteSizeUnit.GB))
@@ -81,7 +81,7 @@ public void testBulkProcessorFlush() throws Exception {
8181

8282
int numDocs = randomIntBetween(10, 100);
8383

84-
try (BulkProcessor processor = BulkProcessor.builder(client(), listener)
84+
try (BulkProcessor processor = BulkProcessor.builder(client()::bulk, listener)
8585
//let's make sure that this bulk won't be automatically flushed
8686
.setConcurrentRequests(randomIntBetween(0, 10)).setBulkActions(numDocs + randomIntBetween(1, 100))
8787
.setFlushInterval(TimeValue.timeValueHours(24)).setBulkSize(new ByteSizeValue(1, ByteSizeUnit.GB)).build()) {
@@ -116,7 +116,7 @@ public void testBulkProcessorConcurrentRequests() throws Exception {
116116

117117
MultiGetRequestBuilder multiGetRequestBuilder;
118118

119-
try (BulkProcessor processor = BulkProcessor.builder(client(), listener)
119+
try (BulkProcessor processor = BulkProcessor.builder(client()::bulk, listener)
120120
.setConcurrentRequests(concurrentRequests).setBulkActions(bulkActions)
121121
//set interval and size to high values
122122
.setFlushInterval(TimeValue.timeValueHours(24)).setBulkSize(new ByteSizeValue(1, ByteSizeUnit.GB)).build()) {
@@ -155,7 +155,7 @@ public void testBulkProcessorWaitOnClose() throws Exception {
155155
BulkProcessorTestListener listener = new BulkProcessorTestListener();
156156

157157
int numDocs = randomIntBetween(10, 100);
158-
BulkProcessor processor = BulkProcessor.builder(client(), listener)
158+
BulkProcessor processor = BulkProcessor.builder(client()::bulk, listener)
159159
//let's make sure that the bulk action limit trips, one single execution will index all the documents
160160
.setConcurrentRequests(randomIntBetween(0, 1)).setBulkActions(numDocs)
161161
.setFlushInterval(TimeValue.timeValueHours(24)).setBulkSize(new ByteSizeValue(randomIntBetween(1, 10),
@@ -202,7 +202,7 @@ public void testBulkProcessorConcurrentRequestsReadOnlyIndex() throws Exception
202202
MultiGetRequestBuilder multiGetRequestBuilder = client().prepareMultiGet();
203203
BulkProcessorTestListener listener = new BulkProcessorTestListener(latch, closeLatch);
204204

205-
try (BulkProcessor processor = BulkProcessor.builder(client(), listener)
205+
try (BulkProcessor processor = BulkProcessor.builder(client()::bulk, listener)
206206
.setConcurrentRequests(concurrentRequests).setBulkActions(bulkActions)
207207
//set interval and size to high values
208208
.setFlushInterval(TimeValue.timeValueHours(24)).setBulkSize(new ByteSizeValue(1, ByteSizeUnit.GB)).build()) {

server/src/test/java/org/elasticsearch/action/bulk/BulkProcessorRetryIT.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -77,7 +77,7 @@ private void executeBulkRejectionLoad(BackoffPolicy backoffPolicy, boolean rejec
7777
assertAcked(prepareCreate(INDEX_NAME));
7878
ensureGreen();
7979

80-
BulkProcessor bulkProcessor = BulkProcessor.builder(client(), new BulkProcessor.Listener() {
80+
BulkProcessor bulkProcessor = BulkProcessor.builder(client()::bulk, new BulkProcessor.Listener() {
8181
@Override
8282
public void beforeBulk(long executionId, BulkRequest request) {
8383
// no op

x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/IndexFollowingIT.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -485,7 +485,7 @@ public void afterBulk(long executionId, BulkRequest request, BulkResponse respon
485485
public void afterBulk(long executionId, BulkRequest request, Throwable failure) {}
486486
};
487487
int bulkSize = between(1, 20);
488-
BulkProcessor bulkProcessor = BulkProcessor.builder(leaderClient(), listener)
488+
BulkProcessor bulkProcessor = BulkProcessor.builder(leaderClient()::bulk, listener)
489489
.setBulkActions(bulkSize)
490490
.setConcurrentRequests(4)
491491
.build();

x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/Watcher.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
import org.elasticsearch.action.bulk.BulkResponse;
1616
import org.elasticsearch.bootstrap.BootstrapCheck;
1717
import org.elasticsearch.client.Client;
18+
import org.elasticsearch.client.OriginSettingClient;
1819
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
1920
import org.elasticsearch.cluster.metadata.IndexTemplateMetaData;
2021
import org.elasticsearch.cluster.node.DiscoveryNodes;
@@ -53,7 +54,6 @@
5354
import org.elasticsearch.threadpool.FixedExecutorBuilder;
5455
import org.elasticsearch.threadpool.ThreadPool;
5556
import org.elasticsearch.watcher.ResourceWatcherService;
56-
import org.elasticsearch.xpack.core.ClientHelper;
5757
import org.elasticsearch.xpack.core.XPackPlugin;
5858
import org.elasticsearch.xpack.core.XPackSettings;
5959
import org.elasticsearch.xpack.core.action.XPackInfoFeatureAction;
@@ -324,7 +324,7 @@ ScriptTransform.TYPE, new ScriptTransformFactory(scriptService),
324324
final InputRegistry inputRegistry = new InputRegistry(inputFactories);
325325
inputFactories.put(ChainInput.TYPE, new ChainInputFactory(inputRegistry));
326326

327-
bulkProcessor = BulkProcessor.builder(ClientHelper.clientWithOrigin(client, WATCHER_ORIGIN), new BulkProcessor.Listener() {
327+
bulkProcessor = BulkProcessor.builder(new OriginSettingClient(client, WATCHER_ORIGIN)::bulk, new BulkProcessor.Listener() {
328328
@Override
329329
public void beforeBulk(long executionId, BulkRequest request) {
330330
}

x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/execution/TriggeredWatchStoreTests.java

+2-1
Original file line numberDiff line numberDiff line change
@@ -128,7 +128,8 @@ public void init() {
128128
when(client.settings()).thenReturn(settings);
129129
when(threadPool.getThreadContext()).thenReturn(new ThreadContext(Settings.EMPTY));
130130
parser = mock(TriggeredWatch.Parser.class);
131-
BulkProcessor bulkProcessor = BulkProcessor.builder(client, listener).setConcurrentRequests(0).setBulkActions(1).build();
131+
BulkProcessor bulkProcessor = BulkProcessor.
132+
builder(client::bulk, listener).setConcurrentRequests(0).setBulkActions(1).build();
132133
triggeredWatchStore = new TriggeredWatchStore(settings, client, parser, bulkProcessor);
133134
}
134135

x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/history/HistoryStoreTests.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -73,7 +73,7 @@ public void init() {
7373
when(client.settings()).thenReturn(settings);
7474
when(threadPool.getThreadContext()).thenReturn(new ThreadContext(settings));
7575
BulkProcessor.Listener listener = mock(BulkProcessor.Listener.class);
76-
BulkProcessor bulkProcessor = BulkProcessor.builder(client, listener).setConcurrentRequests(0).setBulkActions(1).build();
76+
BulkProcessor bulkProcessor = BulkProcessor.builder(client::bulk, listener).setConcurrentRequests(0).setBulkActions(1).build();
7777
historyStore = new HistoryStore(bulkProcessor);
7878
}
7979

0 commit comments

Comments
 (0)