Skip to content

Commit d46be8e

Browse files
committed
Fix Watcher deadlock that can cause in-abilty to index documents. (elastic#41418)
* Fix Watcher deadlock that can cause in-abilty to index documents. This commit removes the usage of the `BulkProcessor` to write history documents and delete triggered watches on a `EsRejectedExecutionException`. Since the exception could be handled on the write thread, the write thread can be blocked waiting on watcher threads (due to a synchronous method). This is problematic since those watcher threads can be blocked waiting on write threads. This commit also moves the handling of the exception to the generic threadpool to avoid submitting write requests from the write thread pool. fixes elastic#41390
1 parent cc39233 commit d46be8e

File tree

3 files changed

+216
-20
lines changed

3 files changed

+216
-20
lines changed

x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/execution/ExecutionService.java

Lines changed: 68 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -16,8 +16,10 @@
1616
import org.elasticsearch.action.ActionListener;
1717
import org.elasticsearch.action.bulk.BulkItemResponse;
1818
import org.elasticsearch.action.bulk.BulkResponse;
19+
import org.elasticsearch.action.delete.DeleteRequest;
1920
import org.elasticsearch.action.get.GetRequest;
2021
import org.elasticsearch.action.get.GetResponse;
22+
import org.elasticsearch.action.index.IndexRequest;
2123
import org.elasticsearch.action.support.PlainActionFuture;
2224
import org.elasticsearch.action.update.UpdateRequest;
2325
import org.elasticsearch.client.Client;
@@ -33,19 +35,25 @@
3335
import org.elasticsearch.common.util.concurrent.ThreadContext;
3436
import org.elasticsearch.common.xcontent.ToXContent;
3537
import org.elasticsearch.common.xcontent.XContentBuilder;
38+
import org.elasticsearch.common.xcontent.XContentFactory;
3639
import org.elasticsearch.common.xcontent.XContentType;
3740
import org.elasticsearch.common.xcontent.json.JsonXContent;
3841
import org.elasticsearch.index.engine.DocumentMissingException;
42+
import org.elasticsearch.index.engine.VersionConflictEngineException;
3943
import org.elasticsearch.xpack.core.watcher.actions.ActionWrapper;
4044
import org.elasticsearch.xpack.core.watcher.actions.ActionWrapperResult;
4145
import org.elasticsearch.xpack.core.watcher.common.stats.Counters;
4246
import org.elasticsearch.xpack.core.watcher.condition.Condition;
4347
import org.elasticsearch.xpack.core.watcher.execution.ExecutionState;
4448
import org.elasticsearch.xpack.core.watcher.execution.QueuedWatch;
49+
import org.elasticsearch.xpack.core.watcher.execution.TriggeredWatchStoreField;
4550
import org.elasticsearch.xpack.core.watcher.execution.WatchExecutionContext;
4651
import org.elasticsearch.xpack.core.watcher.execution.WatchExecutionSnapshot;
52+
import org.elasticsearch.xpack.core.watcher.execution.Wid;
53+
import org.elasticsearch.xpack.core.watcher.history.HistoryStoreField;
4754
import org.elasticsearch.xpack.core.watcher.history.WatchRecord;
4855
import org.elasticsearch.xpack.core.watcher.input.Input;
56+
import org.elasticsearch.xpack.core.watcher.support.xcontent.WatcherParams;
4957
import org.elasticsearch.xpack.core.watcher.transform.Transform;
5058
import org.elasticsearch.xpack.core.watcher.trigger.TriggerEvent;
5159
import org.elasticsearch.xpack.core.watcher.watch.Watch;
@@ -67,8 +75,10 @@
6775
import java.util.LinkedList;
6876
import java.util.List;
6977
import java.util.Map;
78+
import java.util.concurrent.ExecutionException;
7079
import java.util.concurrent.ExecutorService;
7180
import java.util.concurrent.TimeUnit;
81+
import java.util.concurrent.TimeoutException;
7282
import java.util.concurrent.atomic.AtomicBoolean;
7383
import java.util.concurrent.atomic.AtomicReference;
7484

@@ -399,22 +409,68 @@ private void executeAsync(WatchExecutionContext ctx, final TriggeredWatch trigge
399409
try {
400410
executor.execute(new WatchExecutionTask(ctx, () -> execute(ctx)));
401411
} catch (EsRejectedExecutionException e) {
402-
String message = "failed to run triggered watch [" + triggeredWatch.id() + "] due to thread pool capacity";
403-
WatchRecord record = ctx.abortBeforeExecution(ExecutionState.THREADPOOL_REJECTION, message);
404-
try {
405-
if (ctx.overrideRecordOnConflict()) {
406-
historyStore.forcePut(record);
407-
} else {
408-
historyStore.put(record);
412+
//Using the generic pool here since this can happen from a write thread and we don't want to block a write
413+
//thread to kick off these additional write/delete requests.
414+
//Intentionally not using the HistoryStore or TriggerWatchStore to avoid re-using the same synchronous
415+
//BulkProcessor which can cause a deadlock see #41390
416+
genericExecutor.execute(new WatchExecutionTask(ctx, () -> {
417+
String message = "failed to run triggered watch [" + triggeredWatch.id() + "] due to thread pool capacity";
418+
logger.warn(message);
419+
WatchRecord record = ctx.abortBeforeExecution(ExecutionState.THREADPOOL_REJECTION, message);
420+
try {
421+
forcePutHistory(record);
422+
} catch (Exception exc) {
423+
logger.error((Supplier<?>) () ->
424+
new ParameterizedMessage(
425+
"Error storing watch history record for watch [{}] after thread pool rejection",
426+
triggeredWatch.id()), exc);
409427
}
410-
} catch (Exception exc) {
411-
logger.error((Supplier<?>) () ->
412-
new ParameterizedMessage("Error storing watch history record for watch [{}] after thread pool rejection",
413-
triggeredWatch.id()), exc);
428+
deleteTrigger(triggeredWatch.id());
429+
}));
430+
}
431+
}
432+
433+
/**
434+
* Stores the specified watchRecord.
435+
* Any existing watchRecord will be overwritten.
436+
*/
437+
private void forcePutHistory(WatchRecord watchRecord) {
438+
String index = HistoryStoreField.getHistoryIndexNameForTime(watchRecord.triggerEvent().triggeredTime());
439+
try {
440+
try (XContentBuilder builder = XContentFactory.jsonBuilder();
441+
ThreadContext.StoredContext ignore = client.threadPool().getThreadContext().stashWithOrigin(WATCHER_ORIGIN)) {
442+
watchRecord.toXContent(builder, WatcherParams.HIDE_SECRETS);
443+
IndexRequest request = new IndexRequest(index)
444+
.id(watchRecord.id().value())
445+
.source(builder)
446+
.opType(IndexRequest.OpType.CREATE);
447+
client.index(request).get(30, TimeUnit.SECONDS);
448+
logger.debug("indexed watch history record [{}]", watchRecord.id().value());
449+
} catch (VersionConflictEngineException vcee) {
450+
watchRecord = new WatchRecord.MessageWatchRecord(watchRecord, ExecutionState.EXECUTED_MULTIPLE_TIMES,
451+
"watch record [{ " + watchRecord.id() + " }] has been stored before, previous state [" + watchRecord.state() + "]");
452+
try (XContentBuilder xContentBuilder = XContentFactory.jsonBuilder();
453+
ThreadContext.StoredContext ignore = client.threadPool().getThreadContext().stashWithOrigin(WATCHER_ORIGIN)) {
454+
IndexRequest request = new IndexRequest(index)
455+
.id(watchRecord.id().value())
456+
.source(xContentBuilder.value(watchRecord));
457+
client.index(request).get(30, TimeUnit.SECONDS);
458+
}
459+
logger.debug("overwrote watch history record [{}]", watchRecord.id().value());
414460
}
461+
} catch (InterruptedException | ExecutionException | TimeoutException | IOException ioe) {
462+
final WatchRecord wr = watchRecord;
463+
logger.error((Supplier<?>) () -> new ParameterizedMessage("failed to persist watch record [{}]", wr), ioe);
464+
}
465+
}
415466

416-
triggeredWatchStore.delete(triggeredWatch.id());
467+
private void deleteTrigger(Wid watcherId) {
468+
DeleteRequest request = new DeleteRequest(TriggeredWatchStoreField.INDEX_NAME);
469+
request.id(watcherId.value());
470+
try (ThreadContext.StoredContext ignore = client.threadPool().getThreadContext().stashWithOrigin(WATCHER_ORIGIN)) {
471+
client.delete(request).actionGet(30, TimeUnit.SECONDS);
417472
}
473+
logger.trace("successfully deleted triggered watch with id [{}]", watcherId);
418474
}
419475

420476
WatchRecord executeInner(WatchExecutionContext ctx) {

x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/execution/ExecutionServiceTests.java

Lines changed: 67 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -5,11 +5,13 @@
55
*/
66
package org.elasticsearch.xpack.watcher.execution;
77

8-
import org.elasticsearch.ElasticsearchException;
98
import org.elasticsearch.Version;
9+
import org.elasticsearch.action.ActionFuture;
1010
import org.elasticsearch.action.ActionListener;
11+
import org.elasticsearch.action.delete.DeleteRequest;
1112
import org.elasticsearch.action.get.GetRequest;
1213
import org.elasticsearch.action.get.GetResponse;
14+
import org.elasticsearch.action.index.IndexRequest;
1315
import org.elasticsearch.action.support.PlainActionFuture;
1416
import org.elasticsearch.action.update.UpdateRequest;
1517
import org.elasticsearch.action.update.UpdateResponse;
@@ -28,8 +30,11 @@
2830
import org.elasticsearch.common.xcontent.XContentFactory;
2931
import org.elasticsearch.common.xcontent.XContentParser;
3032
import org.elasticsearch.common.xcontent.XContentType;
33+
import org.elasticsearch.index.Index;
3134
import org.elasticsearch.index.IndexNotFoundException;
35+
import org.elasticsearch.index.engine.VersionConflictEngineException;
3236
import org.elasticsearch.index.get.GetResult;
37+
import org.elasticsearch.index.shard.ShardId;
3338
import org.elasticsearch.test.ESTestCase;
3439
import org.elasticsearch.threadpool.ThreadPool;
3540
import org.elasticsearch.xpack.core.security.authc.Authentication;
@@ -48,6 +53,7 @@
4853
import org.elasticsearch.xpack.core.watcher.execution.ExecutionPhase;
4954
import org.elasticsearch.xpack.core.watcher.execution.ExecutionState;
5055
import org.elasticsearch.xpack.core.watcher.execution.QueuedWatch;
56+
import org.elasticsearch.xpack.core.watcher.execution.TriggeredWatchStoreField;
5157
import org.elasticsearch.xpack.core.watcher.execution.WatchExecutionContext;
5258
import org.elasticsearch.xpack.core.watcher.execution.WatchExecutionSnapshot;
5359
import org.elasticsearch.xpack.core.watcher.execution.Wid;
@@ -91,6 +97,7 @@
9197
import static java.util.Collections.singletonMap;
9298
import static org.elasticsearch.common.unit.TimeValue.timeValueSeconds;
9399
import static org.elasticsearch.index.seqno.SequenceNumbers.UNASSIGNED_SEQ_NO;
100+
import static org.hamcrest.Matchers.containsString;
94101
import static org.hamcrest.Matchers.equalTo;
95102
import static org.hamcrest.Matchers.greaterThan;
96103
import static org.hamcrest.Matchers.hasSize;
@@ -844,22 +851,74 @@ public void testThatTriggeredWatchDeletionWorksOnExecutionRejection() throws Exc
844851
when(getResponse.isExists()).thenReturn(true);
845852
when(getResponse.getId()).thenReturn("foo");
846853
mockGetWatchResponse(client, "foo", getResponse);
854+
ActionFuture actionFuture = mock(ActionFuture.class);
855+
when(actionFuture.get()).thenReturn("");
856+
when(client.index(any())).thenReturn(actionFuture);
857+
when(client.delete(any())).thenReturn(actionFuture);
858+
847859
when(parser.parseWithSecrets(eq("foo"), eq(true), any(), any(), any(), anyLong(), anyLong())).thenReturn(watch);
848860

849-
// execute needs to fail as well as storing the history
861+
// execute needs to fail
850862
doThrow(new EsRejectedExecutionException()).when(executor).execute(any());
851-
doThrow(new ElasticsearchException("whatever")).when(historyStore).forcePut(any());
852863

853864
Wid wid = new Wid(watch.id(), ZonedDateTime.now(ZoneOffset.UTC));
854865

855866
TriggeredWatch triggeredWatch = new TriggeredWatch(wid,
856867
new ScheduleTriggerEvent(ZonedDateTime.now(ZoneOffset.UTC) ,ZonedDateTime.now(ZoneOffset.UTC)));
857868
executionService.executeTriggeredWatches(Collections.singleton(triggeredWatch));
858869

859-
verify(triggeredWatchStore, times(1)).delete(wid);
860-
ArgumentCaptor<WatchRecord> captor = ArgumentCaptor.forClass(WatchRecord.class);
861-
verify(historyStore, times(1)).forcePut(captor.capture());
862-
assertThat(captor.getValue().state(), is(ExecutionState.THREADPOOL_REJECTION));
870+
ArgumentCaptor<DeleteRequest> deleteCaptor = ArgumentCaptor.forClass(DeleteRequest.class);
871+
verify(client).delete(deleteCaptor.capture());
872+
assertThat(deleteCaptor.getValue().index(), equalTo(TriggeredWatchStoreField.INDEX_NAME));
873+
assertThat(deleteCaptor.getValue().id(), equalTo(wid.value()));
874+
875+
ArgumentCaptor<IndexRequest> watchHistoryCaptor = ArgumentCaptor.forClass(IndexRequest.class);
876+
verify(client).index(watchHistoryCaptor.capture());
877+
878+
assertThat(watchHistoryCaptor.getValue().source().utf8ToString(), containsString(ExecutionState.THREADPOOL_REJECTION.toString()));
879+
assertThat(watchHistoryCaptor.getValue().index(), containsString(".watcher-history"));
880+
}
881+
882+
public void testForcePutHistoryOnExecutionRejection() throws Exception {
883+
Watch watch = mock(Watch.class);
884+
when(watch.id()).thenReturn("foo");
885+
WatchStatus status = new WatchStatus(ZonedDateTime.now(ZoneOffset.UTC), Collections.emptyMap());
886+
when(watch.status()).thenReturn(status);
887+
GetResponse getResponse = mock(GetResponse.class);
888+
when(getResponse.isExists()).thenReturn(true);
889+
when(getResponse.getId()).thenReturn("foo");
890+
mockGetWatchResponse(client, "foo", getResponse);
891+
ActionFuture actionFuture = mock(ActionFuture.class);
892+
when(actionFuture.get()).thenReturn("");
893+
when(client.index(any()))
894+
.thenThrow(new VersionConflictEngineException(
895+
new ShardId(new Index("mockindex", "mockuuid"), 0), "id", "explaination"))
896+
.thenReturn(actionFuture);
897+
when(client.delete(any())).thenReturn(actionFuture);
898+
899+
when(parser.parseWithSecrets(eq("foo"), eq(true), any(), any(), any(), anyLong(), anyLong())).thenReturn(watch);
900+
901+
// execute needs to fail
902+
doThrow(new EsRejectedExecutionException()).when(executor).execute(any());
903+
904+
Wid wid = new Wid(watch.id(), ZonedDateTime.now(ZoneOffset.UTC));
905+
906+
TriggeredWatch triggeredWatch = new TriggeredWatch(wid,
907+
new ScheduleTriggerEvent(ZonedDateTime.now(ZoneOffset.UTC), ZonedDateTime.now(ZoneOffset.UTC)));
908+
executionService.executeTriggeredWatches(Collections.singleton(triggeredWatch));
909+
910+
ArgumentCaptor<DeleteRequest> deleteCaptor = ArgumentCaptor.forClass(DeleteRequest.class);
911+
verify(client).delete(deleteCaptor.capture());
912+
assertThat(deleteCaptor.getValue().index(), equalTo(TriggeredWatchStoreField.INDEX_NAME));
913+
assertThat(deleteCaptor.getValue().id(), equalTo(wid.value()));
914+
915+
ArgumentCaptor<IndexRequest> watchHistoryCaptor = ArgumentCaptor.forClass(IndexRequest.class);
916+
verify(client, times(2)).index(watchHistoryCaptor.capture());
917+
List<IndexRequest> indexRequests = watchHistoryCaptor.getAllValues();
918+
919+
assertThat(indexRequests.get(0).id(), equalTo(indexRequests.get(1).id()));
920+
assertThat(indexRequests.get(0).source().utf8ToString(), containsString(ExecutionState.THREADPOOL_REJECTION.toString()));
921+
assertThat(indexRequests.get(1).source().utf8ToString(), containsString(ExecutionState.EXECUTED_MULTIPLE_TIMES.toString()));
863922
}
864923

865924
public void testThatTriggeredWatchDeletionHappensOnlyIfWatchExists() throws Exception {
@@ -898,7 +957,7 @@ public void testThatTriggeredWatchDeletionHappensOnlyIfWatchExists() throws Exce
898957
when(watch.status()).thenReturn(watchStatus);
899958

900959
executionService.execute(context);
901-
verify(triggeredWatchStore, never()).delete(any());
960+
verify(client, never()).delete(any());
902961
}
903962

904963
public void testThatSingleWatchCannotBeExecutedConcurrently() throws Exception {
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,81 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License;
4+
* you may not use this file except in compliance with the Elastic License.
5+
*/
6+
package org.elasticsearch.xpack.watcher.test.integration;
7+
8+
import org.elasticsearch.action.search.SearchResponse;
9+
import org.elasticsearch.common.settings.Settings;
10+
import org.elasticsearch.license.LicenseService;
11+
import org.elasticsearch.xpack.core.XPackSettings;
12+
import org.elasticsearch.xpack.core.watcher.client.WatcherClient;
13+
import org.elasticsearch.xpack.watcher.condition.CompareCondition;
14+
import org.elasticsearch.xpack.watcher.support.search.WatcherSearchTemplateRequest;
15+
import org.elasticsearch.xpack.watcher.test.AbstractWatcherIntegrationTestCase;
16+
import org.elasticsearch.xpack.watcher.trigger.schedule.IntervalSchedule;
17+
18+
import java.util.concurrent.TimeUnit;
19+
20+
import static org.elasticsearch.index.query.QueryBuilders.termQuery;
21+
import static org.elasticsearch.search.builder.SearchSourceBuilder.searchSource;
22+
import static org.elasticsearch.xpack.watcher.actions.ActionBuilders.loggingAction;
23+
import static org.elasticsearch.xpack.watcher.client.WatchSourceBuilders.watchBuilder;
24+
import static org.elasticsearch.xpack.watcher.input.InputBuilders.searchInput;
25+
import static org.elasticsearch.xpack.watcher.test.WatcherTestUtils.templateRequest;
26+
import static org.elasticsearch.xpack.watcher.trigger.TriggerBuilders.schedule;
27+
import static org.elasticsearch.xpack.watcher.trigger.schedule.Schedules.interval;
28+
import static org.hamcrest.Matchers.equalTo;
29+
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
30+
31+
public class RejectedExecutionTests extends AbstractWatcherIntegrationTestCase {
32+
33+
@Override
34+
protected boolean timeWarped() {
35+
//need to use the real scheduler
36+
return false;
37+
}
38+
39+
public void testHistoryAndTriggeredOnRejection() throws Exception {
40+
WatcherClient watcherClient = watcherClient();
41+
createIndex("idx");
42+
client().prepareIndex("idx", "_doc").setSource("field", "a").get();
43+
refresh();
44+
WatcherSearchTemplateRequest request = templateRequest(searchSource().query(termQuery("field", "a")), "idx");
45+
watcherClient.preparePutWatch(randomAlphaOfLength(5))
46+
.setSource(watchBuilder()
47+
.trigger(schedule(interval(1, IntervalSchedule.Interval.Unit.SECONDS)))
48+
.input(searchInput(request))
49+
.condition(new CompareCondition("ctx.payload.hits.total", CompareCondition.Op.EQ, 1L))
50+
.addAction("_logger", loggingAction("_logging")
51+
.setCategory("_category")))
52+
.get();
53+
54+
assertBusy(() -> {
55+
flushAndRefresh(".watcher-history-*");
56+
SearchResponse searchResponse = client().prepareSearch(".watcher-history-*").get();
57+
assertThat(searchResponse.getHits().getTotalHits().value, greaterThanOrEqualTo(2L));
58+
}, 10, TimeUnit.SECONDS);
59+
60+
flushAndRefresh(".triggered_watches");
61+
SearchResponse searchResponse = client().prepareSearch(".triggered_watches").get();
62+
assertThat(searchResponse.getHits().getTotalHits().value, equalTo(0L));
63+
}
64+
65+
@Override
66+
protected Settings nodeSettings(int nodeOrdinal) {
67+
68+
return Settings.builder()
69+
.put(super.nodeSettings(nodeOrdinal))
70+
.put(XPackSettings.MONITORING_ENABLED.getKey(), false)
71+
.put(XPackSettings.SECURITY_ENABLED.getKey(), false)
72+
.put(LicenseService.SELF_GENERATED_LICENSE_TYPE.getKey(), "trial")
73+
.put("thread_pool.write.size", 1)
74+
.put("thread_pool.write.queue_size", 1)
75+
.put("xpack.watcher.thread_pool.size", 1)
76+
.put("xpack.watcher.thread_pool.queue_size", 0)
77+
.build();
78+
}
79+
80+
81+
}

0 commit comments

Comments
 (0)