Skip to content

Commit 9eee2cc

Browse files
committed
Fix Watcher deadlock that can cause in-abilty to index documents. (elastic#41418)
This commit removes the usage of the `BulkProcessor` to write history documents and delete triggered watches on a `EsRejectedExecutionException`. Since the exception could be handled on the write thread, the write thread can be blocked waiting on watcher threads (due to a synchronous method). This is problematic since those watcher threads can be blocked waiting on write threads. This commit also moves the handling of the exception to the generic threadpool to avoid submitting write requests from the write thread pool. fixes elastic#41390
1 parent d1f48fe commit 9eee2cc

File tree

3 files changed

+214
-20
lines changed

3 files changed

+214
-20
lines changed

x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/execution/ExecutionService.java

Lines changed: 66 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -14,8 +14,10 @@
1414
import org.elasticsearch.action.ActionListener;
1515
import org.elasticsearch.action.bulk.BulkItemResponse;
1616
import org.elasticsearch.action.bulk.BulkResponse;
17+
import org.elasticsearch.action.delete.DeleteRequest;
1718
import org.elasticsearch.action.get.GetRequest;
1819
import org.elasticsearch.action.get.GetResponse;
20+
import org.elasticsearch.action.index.IndexRequest;
1921
import org.elasticsearch.action.support.PlainActionFuture;
2022
import org.elasticsearch.action.update.UpdateRequest;
2123
import org.elasticsearch.client.Client;
@@ -32,19 +34,25 @@
3234
import org.elasticsearch.common.util.concurrent.ThreadContext;
3335
import org.elasticsearch.common.xcontent.ToXContent;
3436
import org.elasticsearch.common.xcontent.XContentBuilder;
37+
import org.elasticsearch.common.xcontent.XContentFactory;
3538
import org.elasticsearch.common.xcontent.XContentType;
3639
import org.elasticsearch.common.xcontent.json.JsonXContent;
3740
import org.elasticsearch.index.engine.DocumentMissingException;
41+
import org.elasticsearch.index.engine.VersionConflictEngineException;
3842
import org.elasticsearch.xpack.core.watcher.actions.ActionWrapper;
3943
import org.elasticsearch.xpack.core.watcher.actions.ActionWrapperResult;
4044
import org.elasticsearch.xpack.core.watcher.common.stats.Counters;
4145
import org.elasticsearch.xpack.core.watcher.condition.Condition;
4246
import org.elasticsearch.xpack.core.watcher.execution.ExecutionState;
4347
import org.elasticsearch.xpack.core.watcher.execution.QueuedWatch;
48+
import org.elasticsearch.xpack.core.watcher.execution.TriggeredWatchStoreField;
4449
import org.elasticsearch.xpack.core.watcher.execution.WatchExecutionContext;
4550
import org.elasticsearch.xpack.core.watcher.execution.WatchExecutionSnapshot;
51+
import org.elasticsearch.xpack.core.watcher.execution.Wid;
52+
import org.elasticsearch.xpack.core.watcher.history.HistoryStoreField;
4653
import org.elasticsearch.xpack.core.watcher.history.WatchRecord;
4754
import org.elasticsearch.xpack.core.watcher.input.Input;
55+
import org.elasticsearch.xpack.core.watcher.support.xcontent.WatcherParams;
4856
import org.elasticsearch.xpack.core.watcher.transform.Transform;
4957
import org.elasticsearch.xpack.core.watcher.trigger.TriggerEvent;
5058
import org.elasticsearch.xpack.core.watcher.watch.Watch;
@@ -402,22 +410,68 @@ private void executeAsync(WatchExecutionContext ctx, final TriggeredWatch trigge
402410
try {
403411
executor.execute(new WatchExecutionTask(ctx, () -> execute(ctx)));
404412
} catch (EsRejectedExecutionException e) {
405-
String message = "failed to run triggered watch [" + triggeredWatch.id() + "] due to thread pool capacity";
406-
WatchRecord record = ctx.abortBeforeExecution(ExecutionState.THREADPOOL_REJECTION, message);
407-
try {
408-
if (ctx.overrideRecordOnConflict()) {
409-
historyStore.forcePut(record);
410-
} else {
411-
historyStore.put(record);
413+
//Using the generic pool here since this can happen from a write thread and we don't want to block a write
414+
//thread to kick off these additional write/delete requests.
415+
//Intentionally not using the HistoryStore or TriggerWatchStore to avoid re-using the same synchronous
416+
//BulkProcessor which can cause a deadlock see #41390
417+
genericExecutor.execute(new WatchExecutionTask(ctx, () -> {
418+
String message = "failed to run triggered watch [" + triggeredWatch.id() + "] due to thread pool capacity";
419+
logger.warn(message);
420+
WatchRecord record = ctx.abortBeforeExecution(ExecutionState.THREADPOOL_REJECTION, message);
421+
try {
422+
forcePutHistory(record);
423+
} catch (Exception exc) {
424+
logger.error((Supplier<?>) () ->
425+
new ParameterizedMessage(
426+
"Error storing watch history record for watch [{}] after thread pool rejection",
427+
triggeredWatch.id()), exc);
412428
}
413-
} catch (Exception exc) {
414-
logger.error((Supplier<?>) () ->
415-
new ParameterizedMessage("Error storing watch history record for watch [{}] after thread pool rejection",
416-
triggeredWatch.id()), exc);
429+
deleteTrigger(triggeredWatch.id());
430+
}));
431+
}
432+
}
433+
434+
/**
435+
* Stores the specified watchRecord.
436+
* Any existing watchRecord will be overwritten.
437+
*/
438+
private void forcePutHistory(WatchRecord watchRecord) {
439+
String index = HistoryStoreField.getHistoryIndexNameForTime(watchRecord.triggerEvent().triggeredTime());
440+
try {
441+
try (XContentBuilder builder = XContentFactory.jsonBuilder();
442+
ThreadContext.StoredContext ignore = client.threadPool().getThreadContext().stashWithOrigin(WATCHER_ORIGIN)) {
443+
watchRecord.toXContent(builder, WatcherParams.HIDE_SECRETS);
444+
IndexRequest request = new IndexRequest(index)
445+
.id(watchRecord.id().value())
446+
.source(builder)
447+
.opType(IndexRequest.OpType.CREATE);
448+
client.index(request).get(30, TimeUnit.SECONDS);
449+
logger.debug("indexed watch history record [{}]", watchRecord.id().value());
450+
} catch (VersionConflictEngineException vcee) {
451+
watchRecord = new WatchRecord.MessageWatchRecord(watchRecord, ExecutionState.EXECUTED_MULTIPLE_TIMES,
452+
"watch record [{ " + watchRecord.id() + " }] has been stored before, previous state [" + watchRecord.state() + "]");
453+
try (XContentBuilder xContentBuilder = XContentFactory.jsonBuilder();
454+
ThreadContext.StoredContext ignore = client.threadPool().getThreadContext().stashWithOrigin(WATCHER_ORIGIN)) {
455+
IndexRequest request = new IndexRequest(index)
456+
.id(watchRecord.id().value())
457+
.source(xContentBuilder.value(watchRecord));
458+
client.index(request).get(30, TimeUnit.SECONDS);
459+
}
460+
logger.debug("overwrote watch history record [{}]", watchRecord.id().value());
417461
}
462+
} catch (InterruptedException | ExecutionException | TimeoutException | IOException ioe) {
463+
final WatchRecord wr = watchRecord;
464+
logger.error((Supplier<?>) () -> new ParameterizedMessage("failed to persist watch record [{}]", wr), ioe);
465+
}
466+
}
418467

419-
triggeredWatchStore.delete(triggeredWatch.id());
468+
private void deleteTrigger(Wid watcherId) {
469+
DeleteRequest request = new DeleteRequest(TriggeredWatchStoreField.INDEX_NAME);
470+
request.id(watcherId.value());
471+
try (ThreadContext.StoredContext ignore = client.threadPool().getThreadContext().stashWithOrigin(WATCHER_ORIGIN)) {
472+
client.delete(request).actionGet(30, TimeUnit.SECONDS);
420473
}
474+
logger.trace("successfully deleted triggered watch with id [{}]", watcherId);
421475
}
422476

423477
WatchRecord executeInner(WatchExecutionContext ctx) {

x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/execution/ExecutionServiceTests.java

Lines changed: 67 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -5,11 +5,13 @@
55
*/
66
package org.elasticsearch.xpack.watcher.execution;
77

8-
import org.elasticsearch.ElasticsearchException;
98
import org.elasticsearch.Version;
9+
import org.elasticsearch.action.ActionFuture;
1010
import org.elasticsearch.action.ActionListener;
11+
import org.elasticsearch.action.delete.DeleteRequest;
1112
import org.elasticsearch.action.get.GetRequest;
1213
import org.elasticsearch.action.get.GetResponse;
14+
import org.elasticsearch.action.index.IndexRequest;
1315
import org.elasticsearch.action.support.PlainActionFuture;
1416
import org.elasticsearch.action.update.UpdateRequest;
1517
import org.elasticsearch.action.update.UpdateResponse;
@@ -31,8 +33,11 @@
3133
import org.elasticsearch.common.xcontent.XContentFactory;
3234
import org.elasticsearch.common.xcontent.XContentParser;
3335
import org.elasticsearch.common.xcontent.XContentType;
36+
import org.elasticsearch.index.Index;
3437
import org.elasticsearch.index.IndexNotFoundException;
38+
import org.elasticsearch.index.engine.VersionConflictEngineException;
3539
import org.elasticsearch.index.get.GetResult;
40+
import org.elasticsearch.index.shard.ShardId;
3641
import org.elasticsearch.test.ESTestCase;
3742
import org.elasticsearch.threadpool.ThreadPool;
3843
import org.elasticsearch.xpack.core.security.authc.Authentication;
@@ -51,6 +56,7 @@
5156
import org.elasticsearch.xpack.core.watcher.execution.ExecutionPhase;
5257
import org.elasticsearch.xpack.core.watcher.execution.ExecutionState;
5358
import org.elasticsearch.xpack.core.watcher.execution.QueuedWatch;
59+
import org.elasticsearch.xpack.core.watcher.execution.TriggeredWatchStoreField;
5460
import org.elasticsearch.xpack.core.watcher.execution.WatchExecutionContext;
5561
import org.elasticsearch.xpack.core.watcher.execution.WatchExecutionSnapshot;
5662
import org.elasticsearch.xpack.core.watcher.execution.Wid;
@@ -92,6 +98,7 @@
9298
import static java.util.Collections.singletonMap;
9399
import static org.elasticsearch.common.unit.TimeValue.timeValueSeconds;
94100
import static org.elasticsearch.index.seqno.SequenceNumbers.UNASSIGNED_SEQ_NO;
101+
import static org.hamcrest.Matchers.containsString;
95102
import static org.hamcrest.Matchers.equalTo;
96103
import static org.hamcrest.Matchers.hasSize;
97104
import static org.hamcrest.Matchers.instanceOf;
@@ -834,21 +841,73 @@ public void testThatTriggeredWatchDeletionWorksOnExecutionRejection() throws Exc
834841
when(getResponse.isExists()).thenReturn(true);
835842
when(getResponse.getId()).thenReturn("foo");
836843
mockGetWatchResponse(client, "foo", getResponse);
844+
ActionFuture actionFuture = mock(ActionFuture.class);
845+
when(actionFuture.get()).thenReturn("");
846+
when(client.index(any())).thenReturn(actionFuture);
847+
when(client.delete(any())).thenReturn(actionFuture);
848+
837849
when(parser.parseWithSecrets(eq("foo"), eq(true), any(), any(), any(), anyLong(), anyLong())).thenReturn(watch);
838850

839-
// execute needs to fail as well as storing the history
851+
// execute needs to fail
840852
doThrow(new EsRejectedExecutionException()).when(executor).execute(any());
841-
doThrow(new ElasticsearchException("whatever")).when(historyStore).forcePut(any());
842853

843854
Wid wid = new Wid(watch.id(), now());
844855

845856
TriggeredWatch triggeredWatch = new TriggeredWatch(wid, new ScheduleTriggerEvent(now() ,now()));
846857
executionService.executeTriggeredWatches(Collections.singleton(triggeredWatch));
847858

848-
verify(triggeredWatchStore, times(1)).delete(wid);
849-
ArgumentCaptor<WatchRecord> captor = ArgumentCaptor.forClass(WatchRecord.class);
850-
verify(historyStore, times(1)).forcePut(captor.capture());
851-
assertThat(captor.getValue().state(), is(ExecutionState.THREADPOOL_REJECTION));
859+
ArgumentCaptor<DeleteRequest> deleteCaptor = ArgumentCaptor.forClass(DeleteRequest.class);
860+
verify(client).delete(deleteCaptor.capture());
861+
assertThat(deleteCaptor.getValue().index(), equalTo(TriggeredWatchStoreField.INDEX_NAME));
862+
assertThat(deleteCaptor.getValue().id(), equalTo(wid.value()));
863+
864+
ArgumentCaptor<IndexRequest> watchHistoryCaptor = ArgumentCaptor.forClass(IndexRequest.class);
865+
verify(client).index(watchHistoryCaptor.capture());
866+
867+
assertThat(watchHistoryCaptor.getValue().source().utf8ToString(), containsString(ExecutionState.THREADPOOL_REJECTION.toString()));
868+
assertThat(watchHistoryCaptor.getValue().index(), containsString(".watcher-history"));
869+
}
870+
871+
public void testForcePutHistoryOnExecutionRejection() throws Exception {
872+
Watch watch = mock(Watch.class);
873+
when(watch.id()).thenReturn("foo");
874+
WatchStatus status = new WatchStatus(ZonedDateTime.now(ZoneOffset.UTC), Collections.emptyMap());
875+
when(watch.status()).thenReturn(status);
876+
GetResponse getResponse = mock(GetResponse.class);
877+
when(getResponse.isExists()).thenReturn(true);
878+
when(getResponse.getId()).thenReturn("foo");
879+
mockGetWatchResponse(client, "foo", getResponse);
880+
ActionFuture actionFuture = mock(ActionFuture.class);
881+
when(actionFuture.get()).thenReturn("");
882+
when(client.index(any()))
883+
.thenThrow(new VersionConflictEngineException(
884+
new ShardId(new Index("mockindex", "mockuuid"), 0), "id", "explaination"))
885+
.thenReturn(actionFuture);
886+
when(client.delete(any())).thenReturn(actionFuture);
887+
888+
when(parser.parseWithSecrets(eq("foo"), eq(true), any(), any(), any(), anyLong(), anyLong())).thenReturn(watch);
889+
890+
// execute needs to fail
891+
doThrow(new EsRejectedExecutionException()).when(executor).execute(any());
892+
893+
Wid wid = new Wid(watch.id(), ZonedDateTime.now(ZoneOffset.UTC));
894+
895+
TriggeredWatch triggeredWatch = new TriggeredWatch(wid,
896+
new ScheduleTriggerEvent(ZonedDateTime.now(ZoneOffset.UTC), ZonedDateTime.now(ZoneOffset.UTC)));
897+
executionService.executeTriggeredWatches(Collections.singleton(triggeredWatch));
898+
899+
ArgumentCaptor<DeleteRequest> deleteCaptor = ArgumentCaptor.forClass(DeleteRequest.class);
900+
verify(client).delete(deleteCaptor.capture());
901+
assertThat(deleteCaptor.getValue().index(), equalTo(TriggeredWatchStoreField.INDEX_NAME));
902+
assertThat(deleteCaptor.getValue().id(), equalTo(wid.value()));
903+
904+
ArgumentCaptor<IndexRequest> watchHistoryCaptor = ArgumentCaptor.forClass(IndexRequest.class);
905+
verify(client, times(2)).index(watchHistoryCaptor.capture());
906+
List<IndexRequest> indexRequests = watchHistoryCaptor.getAllValues();
907+
908+
assertThat(indexRequests.get(0).id(), equalTo(indexRequests.get(1).id()));
909+
assertThat(indexRequests.get(0).source().utf8ToString(), containsString(ExecutionState.THREADPOOL_REJECTION.toString()));
910+
assertThat(indexRequests.get(1).source().utf8ToString(), containsString(ExecutionState.EXECUTED_MULTIPLE_TIMES.toString()));
852911
}
853912

854913
public void testThatTriggeredWatchDeletionHappensOnlyIfWatchExists() throws Exception {
@@ -887,7 +946,7 @@ public void testThatTriggeredWatchDeletionHappensOnlyIfWatchExists() throws Exce
887946
when(watch.status()).thenReturn(watchStatus);
888947

889948
executionService.execute(context);
890-
verify(triggeredWatchStore, never()).delete(any());
949+
verify(client, never()).delete(any());
891950
}
892951

893952
public void testThatSingleWatchCannotBeExecutedConcurrently() throws Exception {
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,81 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License;
4+
* you may not use this file except in compliance with the Elastic License.
5+
*/
6+
package org.elasticsearch.xpack.watcher.test.integration;
7+
8+
import org.elasticsearch.action.search.SearchResponse;
9+
import org.elasticsearch.common.settings.Settings;
10+
import org.elasticsearch.license.LicenseService;
11+
import org.elasticsearch.xpack.core.XPackSettings;
12+
import org.elasticsearch.xpack.core.watcher.client.WatcherClient;
13+
import org.elasticsearch.xpack.watcher.condition.CompareCondition;
14+
import org.elasticsearch.xpack.watcher.support.search.WatcherSearchTemplateRequest;
15+
import org.elasticsearch.xpack.watcher.test.AbstractWatcherIntegrationTestCase;
16+
import org.elasticsearch.xpack.watcher.trigger.schedule.IntervalSchedule;
17+
18+
import java.util.concurrent.TimeUnit;
19+
20+
import static org.elasticsearch.index.query.QueryBuilders.termQuery;
21+
import static org.elasticsearch.search.builder.SearchSourceBuilder.searchSource;
22+
import static org.elasticsearch.xpack.watcher.actions.ActionBuilders.loggingAction;
23+
import static org.elasticsearch.xpack.watcher.client.WatchSourceBuilders.watchBuilder;
24+
import static org.elasticsearch.xpack.watcher.input.InputBuilders.searchInput;
25+
import static org.elasticsearch.xpack.watcher.test.WatcherTestUtils.templateRequest;
26+
import static org.elasticsearch.xpack.watcher.trigger.TriggerBuilders.schedule;
27+
import static org.elasticsearch.xpack.watcher.trigger.schedule.Schedules.interval;
28+
import static org.hamcrest.Matchers.equalTo;
29+
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
30+
31+
public class RejectedExecutionTests extends AbstractWatcherIntegrationTestCase {
32+
33+
@Override
34+
protected boolean timeWarped() {
35+
//need to use the real scheduler
36+
return false;
37+
}
38+
39+
public void testHistoryAndTriggeredOnRejection() throws Exception {
40+
WatcherClient watcherClient = watcherClient();
41+
createIndex("idx");
42+
client().prepareIndex("idx", "_doc").setSource("field", "a").get();
43+
refresh();
44+
WatcherSearchTemplateRequest request = templateRequest(searchSource().query(termQuery("field", "a")), "idx");
45+
watcherClient.preparePutWatch(randomAlphaOfLength(5))
46+
.setSource(watchBuilder()
47+
.trigger(schedule(interval(1, IntervalSchedule.Interval.Unit.SECONDS)))
48+
.input(searchInput(request))
49+
.condition(new CompareCondition("ctx.payload.hits.total", CompareCondition.Op.EQ, 1L))
50+
.addAction("_logger", loggingAction("_logging")
51+
.setCategory("_category")))
52+
.get();
53+
54+
assertBusy(() -> {
55+
flushAndRefresh(".watcher-history-*");
56+
SearchResponse searchResponse = client().prepareSearch(".watcher-history-*").get();
57+
assertThat(searchResponse.getHits().getTotalHits().value, greaterThanOrEqualTo(2L));
58+
}, 10, TimeUnit.SECONDS);
59+
60+
flushAndRefresh(".triggered_watches");
61+
SearchResponse searchResponse = client().prepareSearch(".triggered_watches").get();
62+
assertThat(searchResponse.getHits().getTotalHits().value, equalTo(0L));
63+
}
64+
65+
@Override
66+
protected Settings nodeSettings(int nodeOrdinal) {
67+
68+
return Settings.builder()
69+
.put(super.nodeSettings(nodeOrdinal))
70+
.put(XPackSettings.MONITORING_ENABLED.getKey(), false)
71+
.put(XPackSettings.SECURITY_ENABLED.getKey(), false)
72+
.put(LicenseService.SELF_GENERATED_LICENSE_TYPE.getKey(), "trial")
73+
.put("thread_pool.write.size", 1)
74+
.put("thread_pool.write.queue_size", 1)
75+
.put("xpack.watcher.thread_pool.size", 1)
76+
.put("xpack.watcher.thread_pool.queue_size", 0)
77+
.build();
78+
}
79+
80+
81+
}

0 commit comments

Comments
 (0)