|
15 | 15 | import org.elasticsearch.action.ActionListener;
|
16 | 16 | import org.elasticsearch.action.bulk.BulkItemResponse;
|
17 | 17 | import org.elasticsearch.action.bulk.BulkResponse;
|
| 18 | +import org.elasticsearch.action.delete.DeleteRequest; |
18 | 19 | import org.elasticsearch.action.get.GetRequest;
|
19 | 20 | import org.elasticsearch.action.get.GetResponse;
|
| 21 | +import org.elasticsearch.action.index.IndexRequest; |
20 | 22 | import org.elasticsearch.action.support.PlainActionFuture;
|
21 | 23 | import org.elasticsearch.action.update.UpdateRequest;
|
22 | 24 | import org.elasticsearch.client.Client;
|
|
32 | 34 | import org.elasticsearch.common.util.concurrent.ThreadContext;
|
33 | 35 | import org.elasticsearch.common.xcontent.ToXContent;
|
34 | 36 | import org.elasticsearch.common.xcontent.XContentBuilder;
|
| 37 | +import org.elasticsearch.common.xcontent.XContentFactory; |
35 | 38 | import org.elasticsearch.common.xcontent.XContentType;
|
36 | 39 | import org.elasticsearch.common.xcontent.json.JsonXContent;
|
37 | 40 | import org.elasticsearch.index.engine.DocumentMissingException;
|
| 41 | +import org.elasticsearch.index.engine.VersionConflictEngineException; |
38 | 42 | import org.elasticsearch.xpack.core.watcher.actions.ActionWrapper;
|
39 | 43 | import org.elasticsearch.xpack.core.watcher.actions.ActionWrapperResult;
|
40 | 44 | import org.elasticsearch.xpack.core.watcher.common.stats.Counters;
|
41 | 45 | import org.elasticsearch.xpack.core.watcher.condition.Condition;
|
42 | 46 | import org.elasticsearch.xpack.core.watcher.execution.ExecutionState;
|
43 | 47 | import org.elasticsearch.xpack.core.watcher.execution.QueuedWatch;
|
| 48 | +import org.elasticsearch.xpack.core.watcher.execution.TriggeredWatchStoreField; |
44 | 49 | import org.elasticsearch.xpack.core.watcher.execution.WatchExecutionContext;
|
45 | 50 | import org.elasticsearch.xpack.core.watcher.execution.WatchExecutionSnapshot;
|
| 51 | +import org.elasticsearch.xpack.core.watcher.execution.Wid; |
| 52 | +import org.elasticsearch.xpack.core.watcher.history.HistoryStoreField; |
46 | 53 | import org.elasticsearch.xpack.core.watcher.history.WatchRecord;
|
47 | 54 | import org.elasticsearch.xpack.core.watcher.input.Input;
|
| 55 | +import org.elasticsearch.xpack.core.watcher.support.xcontent.WatcherParams; |
48 | 56 | import org.elasticsearch.xpack.core.watcher.transform.Transform;
|
49 | 57 | import org.elasticsearch.xpack.core.watcher.trigger.TriggerEvent;
|
50 | 58 | import org.elasticsearch.xpack.core.watcher.watch.Watch;
|
|
66 | 74 | import java.util.LinkedList;
|
67 | 75 | import java.util.List;
|
68 | 76 | import java.util.Map;
|
| 77 | +import java.util.concurrent.ExecutionException; |
69 | 78 | import java.util.concurrent.ExecutorService;
|
70 | 79 | import java.util.concurrent.TimeUnit;
|
| 80 | +import java.util.concurrent.TimeoutException; |
71 | 81 | import java.util.concurrent.atomic.AtomicBoolean;
|
72 | 82 | import java.util.concurrent.atomic.AtomicReference;
|
| 83 | +import java.util.concurrent.locks.Lock; |
| 84 | +import java.util.concurrent.locks.ReadWriteLock; |
| 85 | +import java.util.concurrent.locks.ReentrantReadWriteLock; |
73 | 86 |
|
74 | 87 | import static org.elasticsearch.xpack.core.ClientHelper.WATCHER_ORIGIN;
|
75 | 88 | import static org.elasticsearch.xpack.core.ClientHelper.stashWithOrigin;
|
@@ -97,6 +110,8 @@ public class ExecutionService {
|
97 | 110 | private final Client client;
|
98 | 111 | private final WatchExecutor executor;
|
99 | 112 | private final ExecutorService genericExecutor;
|
| 113 | + private final ReadWriteLock readWriteLock = new ReentrantReadWriteLock(); |
| 114 | + private final Lock putUpdateLock = readWriteLock.readLock(); |
100 | 115 |
|
101 | 116 | private AtomicReference<CurrentExecutions> currentExecutions = new AtomicReference<>();
|
102 | 117 | private final AtomicBoolean paused = new AtomicBoolean(false);
|
@@ -399,22 +414,70 @@ private void executeAsync(WatchExecutionContext ctx, final TriggeredWatch trigge
|
399 | 414 | try {
|
400 | 415 | executor.execute(new WatchExecutionTask(ctx, () -> execute(ctx)));
|
401 | 416 | } catch (EsRejectedExecutionException e) {
|
402 |
| - String message = "failed to run triggered watch [" + triggeredWatch.id() + "] due to thread pool capacity"; |
403 |
| - WatchRecord record = ctx.abortBeforeExecution(ExecutionState.THREADPOOL_REJECTION, message); |
404 |
| - try { |
405 |
| - if (ctx.overrideRecordOnConflict()) { |
406 |
| - historyStore.forcePut(record); |
407 |
| - } else { |
408 |
| - historyStore.put(record); |
| 417 | + //Using the generic pool here since this can happen from a write thread and we don't want to block a write |
| 418 | + //thread to kick off these additional write/delete requests. |
| 419 | + //Intentionally not using the HistoryStore or TriggerWatchStore to avoid re-using the same synchronous |
| 420 | + //BulkProcessor which can cause a deadlock see #41390 |
| 421 | + genericExecutor.execute(new WatchExecutionTask(ctx, () -> { |
| 422 | + String message = "failed to run triggered watch [" + triggeredWatch.id() + "] due to thread pool capacity"; |
| 423 | + WatchRecord record = ctx.abortBeforeExecution(ExecutionState.THREADPOOL_REJECTION, message); |
| 424 | + try { |
| 425 | + forcePutHistory(record); |
| 426 | + } catch (Exception exc) { |
| 427 | + logger.error((Supplier<?>) () -> |
| 428 | + new ParameterizedMessage( |
| 429 | + "Error storing watch history record for watch [{}] after thread pool rejection", |
| 430 | + triggeredWatch.id()), exc); |
409 | 431 | }
|
410 |
| - } catch (Exception exc) { |
411 |
| - logger.error((Supplier<?>) () -> |
412 |
| - new ParameterizedMessage("Error storing watch history record for watch [{}] after thread pool rejection", |
413 |
| - triggeredWatch.id()), exc); |
| 432 | + deleteTrigger(triggeredWatch.id()); |
| 433 | + })); |
| 434 | + } |
| 435 | + } |
| 436 | + |
| 437 | + /** |
| 438 | + * Stores the specified watchRecord. |
| 439 | + * Any existing watchRecord will be overwritten. |
| 440 | + */ |
| 441 | + private void forcePutHistory(WatchRecord watchRecord) { |
| 442 | + String index = HistoryStoreField.getHistoryIndexNameForTime(watchRecord.triggerEvent().triggeredTime()); |
| 443 | + putUpdateLock.lock(); |
| 444 | + try { |
| 445 | + try (XContentBuilder builder = XContentFactory.jsonBuilder(); |
| 446 | + ThreadContext.StoredContext ignore = client.threadPool().getThreadContext().stashWithOrigin(WATCHER_ORIGIN)) { |
| 447 | + watchRecord.toXContent(builder, WatcherParams.HIDE_SECRETS); |
| 448 | + IndexRequest request = new IndexRequest(index) |
| 449 | + .id(watchRecord.id().value()) |
| 450 | + .source(builder) |
| 451 | + .opType(IndexRequest.OpType.CREATE); |
| 452 | + client.index(request).get(30, TimeUnit.SECONDS); |
| 453 | + logger.debug("indexed watch history record [{}]", watchRecord.id().value()); |
| 454 | + } catch (VersionConflictEngineException vcee) { |
| 455 | + watchRecord = new WatchRecord.MessageWatchRecord(watchRecord, ExecutionState.EXECUTED_MULTIPLE_TIMES, |
| 456 | + "watch record [{ " + watchRecord.id() + " }] has been stored before, previous state [" + watchRecord.state() + "]"); |
| 457 | + try (XContentBuilder xContentBuilder = XContentFactory.jsonBuilder(); |
| 458 | + ThreadContext.StoredContext ignore = stashWithOrigin(client.threadPool().getThreadContext(), WATCHER_ORIGIN)) { |
| 459 | + IndexRequest request = new IndexRequest(index) |
| 460 | + .id(watchRecord.id().value()) |
| 461 | + .source(xContentBuilder.value(watchRecord)); |
| 462 | + client.index(request).get(30, TimeUnit.SECONDS); |
| 463 | + } |
| 464 | + logger.debug("overwrote watch history record [{}]", watchRecord.id().value()); |
414 | 465 | }
|
| 466 | + } catch (InterruptedException | ExecutionException | TimeoutException | IOException ioe) { |
| 467 | + final WatchRecord wr = watchRecord; |
| 468 | + logger.error((Supplier<?>) () -> new ParameterizedMessage("failed to persist watch record [{}]", wr), ioe); |
| 469 | + } finally { |
| 470 | + putUpdateLock.unlock(); |
| 471 | + } |
| 472 | + } |
415 | 473 |
|
416 |
| - triggeredWatchStore.delete(triggeredWatch.id()); |
| 474 | + private void deleteTrigger(Wid watcherId) { |
| 475 | + DeleteRequest request = new DeleteRequest(TriggeredWatchStoreField.INDEX_NAME); |
| 476 | + request.id(watcherId.value()); |
| 477 | + try (ThreadContext.StoredContext ignore = client.threadPool().getThreadContext().stashWithOrigin(WATCHER_ORIGIN)) { |
| 478 | + client.delete(request).actionGet(30, TimeUnit.SECONDS); |
417 | 479 | }
|
| 480 | + logger.trace("successfully deleted triggered watch with id [{}]", watcherId); |
418 | 481 | }
|
419 | 482 |
|
420 | 483 | WatchRecord executeInner(WatchExecutionContext ctx) {
|
|
0 commit comments