Skip to content

Commit a0a484a

Browse files
authored
[6.8] Prevent deadlock by using separate schedulers (#48697) (#48963)
* Prevent deadlock by using separate schedulers (#48697) Currently the BulkProcessor class uses a single scheduler to schedule flushes and retries. Functionally these are very different concerns but can result in a dead lock. Specifically, the single shared scheduler can kick off a flush task, which only finishes it's task when the bulk that is being flushed finishes. If (for what ever reason), any items in that bulk fails it will (by default) schedule a retry. However, that retry will never run it's task, since the flush task is consuming the 1 and only thread available from the shared scheduler. Since the BulkProcessor is mostly client based code, the client can provide their own scheduler. As-is the scheduler would require at minimum 2 worker threads to avoid the potential deadlock. Since the number of threads is a configuration option in the scheduler, the code can not enforce this 2 worker rule until runtime. For this reason this commit splits the single task scheduler into 2 schedulers. This eliminates the potential for the flush task to block the retry task and removes this deadlock scenario. This commit also deprecates the Java APIs that presume a single scheduler, and updates any internal code to no longer use those APIs. Fixes #47599 Note - #41451 fixed the general case where a bulk fails and is retried that can result in a deadlock. This fix should address that case as well as the case when a bulk failure *from the flush* needs to be retried.
1 parent d2e96ce commit a0a484a

File tree

7 files changed

+72
-24
lines changed

7 files changed

+72
-24
lines changed

server/src/main/java/org/elasticsearch/action/bulk/BulkProcessor.java

+60-13
Original file line numberDiff line numberDiff line change
@@ -82,7 +82,8 @@ public static class Builder {
8282

8383
private final BiConsumer<BulkRequest, ActionListener<BulkResponse>> consumer;
8484
private final Listener listener;
85-
private final Scheduler scheduler;
85+
private final Scheduler flushScheduler;
86+
private final Scheduler retryScheduler;
8687
private final Runnable onClose;
8788
private int concurrentRequests = 1;
8889
private int bulkActions = 1000;
@@ -95,10 +96,11 @@ public static class Builder {
9596
private String globalPipeline;
9697

9798
private Builder(BiConsumer<BulkRequest, ActionListener<BulkResponse>> consumer, Listener listener,
98-
Scheduler scheduler, Runnable onClose) {
99+
Scheduler flushScheduler, Scheduler retryScheduler, Runnable onClose) {
99100
this.consumer = consumer;
100101
this.listener = listener;
101-
this.scheduler = scheduler;
102+
this.flushScheduler = flushScheduler;
103+
this.retryScheduler = retryScheduler;
102104
this.onClose = onClose;
103105
}
104106

@@ -182,7 +184,7 @@ public Builder setBackoffPolicy(BackoffPolicy backoffPolicy) {
182184
*/
183185
public BulkProcessor build() {
184186
return new BulkProcessor(consumer, backoffPolicy, listener, concurrentRequests, bulkActions,
185-
bulkSize, flushInterval, scheduler, onClose, createBulkRequestWithGlobalDefaults());
187+
bulkSize, flushInterval, flushScheduler, retryScheduler, onClose, createBulkRequestWithGlobalDefaults());
186188
}
187189

188190
private Supplier<BulkRequest> createBulkRequestWithGlobalDefaults() {
@@ -192,19 +194,55 @@ private Supplier<BulkRequest> createBulkRequestWithGlobalDefaults() {
192194
}
193195
}
194196

197+
/**
198+
* @param client The client that executes the bulk operations
199+
* @param listener The BulkProcessor listener that gets called on bulk events
200+
* @param flushScheduler The scheduler that is used to flush
201+
* @param retryScheduler The scheduler that is used for retries
202+
* @param onClose The runnable instance that is executed on close. Consumers are required to clean up the schedulers.
203+
* @return the builder for BulkProcessor
204+
*/
205+
public static Builder builder(Client client, Listener listener, Scheduler flushScheduler, Scheduler retryScheduler, Runnable onClose) {
206+
Objects.requireNonNull(client, "client");
207+
Objects.requireNonNull(listener, "listener");
208+
return new Builder(client::bulk, listener, flushScheduler, retryScheduler, onClose);
209+
}
210+
211+
212+
/**
213+
* @param client The client that executes the bulk operations
214+
* @param listener The BulkProcessor listener that gets called on bulk events
215+
* @return the builder for BulkProcessor
216+
* @deprecated Use {@link #builder(java.util.function.BiConsumer, org.elasticsearch.action.bulk.BulkProcessor.Listener)}
217+
* with client::bulk as the first argument, or {@link #builder(org.elasticsearch.client.Client,
218+
* org.elasticsearch.action.bulk.BulkProcessor.Listener, org.elasticsearch.threadpool.Scheduler,
219+
* org.elasticsearch.threadpool.Scheduler, java.lang.Runnable)} and manage the flush and retry schedulers explicitly
220+
*/
221+
@Deprecated
195222
public static Builder builder(Client client, Listener listener) {
196223
Objects.requireNonNull(client, "client");
197224
Objects.requireNonNull(listener, "listener");
198-
return new Builder(client::bulk, listener, client.threadPool(), () -> {});
225+
return new Builder(client::bulk, listener, client.threadPool(), client.threadPool(), () -> {});
199226
}
200227

228+
/**
229+
* @param consumer The consumer that is called to fulfil bulk operations
230+
* @param listener The BulkProcessor listener that gets called on bulk events
231+
* @return the builder for BulkProcessor
232+
*/
201233
public static Builder builder(BiConsumer<BulkRequest, ActionListener<BulkResponse>> consumer, Listener listener) {
202234
Objects.requireNonNull(consumer, "consumer");
203235
Objects.requireNonNull(listener, "listener");
204-
final ScheduledThreadPoolExecutor scheduledThreadPoolExecutor = Scheduler.initScheduler(Settings.EMPTY);
236+
final ScheduledThreadPoolExecutor flushScheduledThreadPoolExecutor = Scheduler.initScheduler(Settings.EMPTY);
237+
final ScheduledThreadPoolExecutor retryScheduledThreadPoolExecutor = Scheduler.initScheduler(Settings.EMPTY);
205238
return new Builder(consumer, listener,
206-
buildScheduler(scheduledThreadPoolExecutor),
207-
() -> Scheduler.terminate(scheduledThreadPoolExecutor, 10, TimeUnit.SECONDS));
239+
buildScheduler(flushScheduledThreadPoolExecutor),
240+
buildScheduler(retryScheduledThreadPoolExecutor),
241+
() ->
242+
{
243+
Scheduler.terminate(flushScheduledThreadPoolExecutor, 10, TimeUnit.SECONDS);
244+
Scheduler.terminate(retryScheduledThreadPoolExecutor, 10, TimeUnit.SECONDS);
245+
});
208246
}
209247

210248
private static Scheduler buildScheduler(ScheduledThreadPoolExecutor scheduledThreadPoolExecutor) {
@@ -222,25 +260,34 @@ private static Scheduler buildScheduler(ScheduledThreadPoolExecutor scheduledThr
222260
private BulkRequest bulkRequest;
223261
private final Supplier<BulkRequest> bulkRequestSupplier;
224262
private final BulkRequestHandler bulkRequestHandler;
225-
private final Scheduler scheduler;
226263
private final Runnable onClose;
227264

228265
private volatile boolean closed = false;
229266

230267
BulkProcessor(BiConsumer<BulkRequest, ActionListener<BulkResponse>> consumer, BackoffPolicy backoffPolicy, Listener listener,
231268
int concurrentRequests, int bulkActions, ByteSizeValue bulkSize, @Nullable TimeValue flushInterval,
232-
Scheduler scheduler, Runnable onClose, Supplier<BulkRequest> bulkRequestSupplier) {
269+
Scheduler flushScheduler, Scheduler retryScheduler, Runnable onClose, Supplier<BulkRequest> bulkRequestSupplier) {
233270
this.bulkActions = bulkActions;
234271
this.bulkSize = bulkSize.getBytes();
235-
this.scheduler = scheduler;
236272
this.bulkRequest = bulkRequestSupplier.get();
237273
this.bulkRequestSupplier = bulkRequestSupplier;
238-
this.bulkRequestHandler = new BulkRequestHandler(consumer, backoffPolicy, listener, scheduler, concurrentRequests);
274+
this.bulkRequestHandler = new BulkRequestHandler(consumer, backoffPolicy, listener, retryScheduler, concurrentRequests);
239275
// Start period flushing task after everything is setup
240-
this.cancellableFlushTask = startFlushTask(flushInterval, scheduler);
276+
this.cancellableFlushTask = startFlushTask(flushInterval, flushScheduler);
241277
this.onClose = onClose;
242278
}
243279

280+
/**
281+
* @deprecated use the {@link BulkProcessor} constructor which uses separate schedulers for flush and retry
282+
*/
283+
@Deprecated
284+
BulkProcessor(BiConsumer<BulkRequest, ActionListener<BulkResponse>> consumer, BackoffPolicy backoffPolicy, Listener listener,
285+
int concurrentRequests, int bulkActions, ByteSizeValue bulkSize, @Nullable TimeValue flushInterval,
286+
Scheduler scheduler, Runnable onClose, Supplier<BulkRequest> bulkRequestSupplier) {
287+
this(consumer, backoffPolicy, listener, concurrentRequests, bulkActions, bulkSize, flushInterval,
288+
scheduler, scheduler, onClose, bulkRequestSupplier );
289+
}
290+
244291
/**
245292
* Closes the processor. If flushing by time is enabled, then it's shutdown. Any remaining bulk actions are flushed.
246293
*/

server/src/test/java/org/elasticsearch/action/bulk/BulkProcessorIT.java

+5-5
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,7 @@ public void testThatBulkProcessorCountIsCorrect() throws Exception {
6363
BulkProcessorTestListener listener = new BulkProcessorTestListener(latch);
6464

6565
int numDocs = randomIntBetween(10, 100);
66-
try (BulkProcessor processor = BulkProcessor.builder(client(), listener)
66+
try (BulkProcessor processor = BulkProcessor.builder(client()::bulk, listener)
6767
//let's make sure that the bulk action limit trips, one single execution will index all the documents
6868
.setConcurrentRequests(randomIntBetween(0, 1)).setBulkActions(numDocs)
6969
.setFlushInterval(TimeValue.timeValueHours(24)).setBulkSize(new ByteSizeValue(1, ByteSizeUnit.GB))
@@ -87,7 +87,7 @@ public void testBulkProcessorFlush() throws Exception {
8787

8888
int numDocs = randomIntBetween(10, 100);
8989

90-
try (BulkProcessor processor = BulkProcessor.builder(client(), listener)
90+
try (BulkProcessor processor = BulkProcessor.builder(client()::bulk, listener)
9191
//let's make sure that this bulk won't be automatically flushed
9292
.setConcurrentRequests(randomIntBetween(0, 10)).setBulkActions(numDocs + randomIntBetween(1, 100))
9393
.setFlushInterval(TimeValue.timeValueHours(24)).setBulkSize(new ByteSizeValue(1, ByteSizeUnit.GB)).build()) {
@@ -122,7 +122,7 @@ public void testBulkProcessorConcurrentRequests() throws Exception {
122122

123123
MultiGetRequestBuilder multiGetRequestBuilder;
124124

125-
try (BulkProcessor processor = BulkProcessor.builder(client(), listener)
125+
try (BulkProcessor processor = BulkProcessor.builder(client()::bulk, listener)
126126
.setConcurrentRequests(concurrentRequests).setBulkActions(bulkActions)
127127
//set interval and size to high values
128128
.setFlushInterval(TimeValue.timeValueHours(24)).setBulkSize(new ByteSizeValue(1, ByteSizeUnit.GB)).build()) {
@@ -204,7 +204,7 @@ public void testBulkProcessorWaitOnClose() throws Exception {
204204
BulkProcessorTestListener listener = new BulkProcessorTestListener();
205205

206206
int numDocs = randomIntBetween(10, 100);
207-
BulkProcessor processor = BulkProcessor.builder(client(), listener)
207+
BulkProcessor processor = BulkProcessor.builder(client()::bulk, listener)
208208
//let's make sure that the bulk action limit trips, one single execution will index all the documents
209209
.setConcurrentRequests(randomIntBetween(0, 1)).setBulkActions(numDocs)
210210
.setFlushInterval(TimeValue.timeValueHours(24)).setBulkSize(new ByteSizeValue(randomIntBetween(1, 10),
@@ -251,7 +251,7 @@ public void testBulkProcessorConcurrentRequestsReadOnlyIndex() throws Exception
251251
MultiGetRequestBuilder multiGetRequestBuilder = client().prepareMultiGet();
252252
BulkProcessorTestListener listener = new BulkProcessorTestListener(latch, closeLatch);
253253

254-
try (BulkProcessor processor = BulkProcessor.builder(client(), listener)
254+
try (BulkProcessor processor = BulkProcessor.builder(client()::bulk, listener)
255255
.setConcurrentRequests(concurrentRequests).setBulkActions(bulkActions)
256256
//set interval and size to high values
257257
.setFlushInterval(TimeValue.timeValueHours(24)).setBulkSize(new ByteSizeValue(1, ByteSizeUnit.GB)).build()) {

server/src/test/java/org/elasticsearch/action/bulk/BulkProcessorRetryIT.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -77,7 +77,7 @@ private void executeBulkRejectionLoad(BackoffPolicy backoffPolicy, boolean rejec
7777
assertAcked(prepareCreate(INDEX_NAME));
7878
ensureGreen();
7979

80-
BulkProcessor bulkProcessor = BulkProcessor.builder(client(), new BulkProcessor.Listener() {
80+
BulkProcessor bulkProcessor = BulkProcessor.builder(client()::bulk, new BulkProcessor.Listener() {
8181
@Override
8282
public void beforeBulk(long executionId, BulkRequest request) {
8383
// no op

x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/IndexFollowingIT.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -438,7 +438,7 @@ public void afterBulk(long executionId, BulkRequest request, BulkResponse respon
438438
public void afterBulk(long executionId, BulkRequest request, Throwable failure) {}
439439
};
440440
int bulkSize = between(1, 20);
441-
BulkProcessor bulkProcessor = BulkProcessor.builder(leaderClient(), listener)
441+
BulkProcessor bulkProcessor = BulkProcessor.builder(leaderClient()::bulk, listener)
442442
.setBulkActions(bulkSize)
443443
.setConcurrentRequests(4)
444444
.build();

x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/Watcher.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
import org.elasticsearch.action.bulk.BulkResponse;
1717
import org.elasticsearch.bootstrap.BootstrapCheck;
1818
import org.elasticsearch.client.Client;
19+
import org.elasticsearch.client.OriginSettingClient;
1920
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
2021
import org.elasticsearch.cluster.metadata.IndexTemplateMetaData;
2122
import org.elasticsearch.cluster.node.DiscoveryNodes;
@@ -57,7 +58,6 @@
5758
import org.elasticsearch.threadpool.FixedExecutorBuilder;
5859
import org.elasticsearch.threadpool.ThreadPool;
5960
import org.elasticsearch.watcher.ResourceWatcherService;
60-
import org.elasticsearch.xpack.core.ClientHelper;
6161
import org.elasticsearch.xpack.core.XPackPlugin;
6262
import org.elasticsearch.xpack.core.XPackSettings;
6363
import org.elasticsearch.xpack.core.ssl.SSLService;
@@ -356,7 +356,7 @@ public Collection<Object> createComponents(Client client, ClusterService cluster
356356
final InputRegistry inputRegistry = new InputRegistry(inputFactories);
357357
inputFactories.put(ChainInput.TYPE, new ChainInputFactory(inputRegistry));
358358

359-
bulkProcessor = BulkProcessor.builder(ClientHelper.clientWithOrigin(client, WATCHER_ORIGIN), new BulkProcessor.Listener() {
359+
bulkProcessor = BulkProcessor.builder(new OriginSettingClient(client, WATCHER_ORIGIN)::bulk, new BulkProcessor.Listener() {
360360
@Override
361361
public void beforeBulk(long executionId, BulkRequest request) {
362362
}

x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/execution/TriggeredWatchStoreTests.java

+2-1
Original file line numberDiff line numberDiff line change
@@ -128,7 +128,8 @@ public void init() {
128128
when(client.settings()).thenReturn(settings);
129129
when(threadPool.getThreadContext()).thenReturn(new ThreadContext(Settings.EMPTY));
130130
parser = mock(TriggeredWatch.Parser.class);
131-
BulkProcessor bulkProcessor = BulkProcessor.builder(client, listener).setConcurrentRequests(0).setBulkActions(1).build();
131+
BulkProcessor bulkProcessor = BulkProcessor.
132+
builder(client::bulk, listener).setConcurrentRequests(0).setBulkActions(1).build();
132133
triggeredWatchStore = new TriggeredWatchStore(settings, client, parser, bulkProcessor);
133134
}
134135

x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/history/HistoryStoreTests.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -70,7 +70,7 @@ public void init() {
7070
when(client.settings()).thenReturn(settings);
7171
when(threadPool.getThreadContext()).thenReturn(new ThreadContext(settings));
7272
BulkProcessor.Listener listener = mock(BulkProcessor.Listener.class);
73-
BulkProcessor bulkProcessor = BulkProcessor.builder(client, listener).setConcurrentRequests(0).setBulkActions(1).build();
73+
BulkProcessor bulkProcessor = BulkProcessor.builder(client::bulk, listener).setConcurrentRequests(0).setBulkActions(1).build();
7474
historyStore = new HistoryStore(bulkProcessor);
7575
}
7676

0 commit comments

Comments
 (0)