|
5 | 5 | */
|
6 | 6 | package org.elasticsearch.xpack.watcher.execution;
|
7 | 7 |
|
8 |
| -import org.elasticsearch.ElasticsearchException; |
9 | 8 | import org.elasticsearch.Version;
|
| 9 | +import org.elasticsearch.action.ActionFuture; |
10 | 10 | import org.elasticsearch.action.ActionListener;
|
| 11 | +import org.elasticsearch.action.delete.DeleteRequest; |
11 | 12 | import org.elasticsearch.action.get.GetRequest;
|
12 | 13 | import org.elasticsearch.action.get.GetResponse;
|
| 14 | +import org.elasticsearch.action.index.IndexRequest; |
13 | 15 | import org.elasticsearch.action.support.PlainActionFuture;
|
14 | 16 | import org.elasticsearch.action.update.UpdateRequest;
|
15 | 17 | import org.elasticsearch.action.update.UpdateResponse;
|
|
28 | 30 | import org.elasticsearch.common.xcontent.XContentFactory;
|
29 | 31 | import org.elasticsearch.common.xcontent.XContentParser;
|
30 | 32 | import org.elasticsearch.common.xcontent.XContentType;
|
| 33 | +import org.elasticsearch.index.Index; |
31 | 34 | import org.elasticsearch.index.IndexNotFoundException;
|
| 35 | +import org.elasticsearch.index.engine.VersionConflictEngineException; |
32 | 36 | import org.elasticsearch.index.get.GetResult;
|
| 37 | +import org.elasticsearch.index.shard.ShardId; |
33 | 38 | import org.elasticsearch.test.ESTestCase;
|
34 | 39 | import org.elasticsearch.threadpool.ThreadPool;
|
35 | 40 | import org.elasticsearch.xpack.core.security.authc.Authentication;
|
|
48 | 53 | import org.elasticsearch.xpack.core.watcher.execution.ExecutionPhase;
|
49 | 54 | import org.elasticsearch.xpack.core.watcher.execution.ExecutionState;
|
50 | 55 | import org.elasticsearch.xpack.core.watcher.execution.QueuedWatch;
|
| 56 | +import org.elasticsearch.xpack.core.watcher.execution.TriggeredWatchStoreField; |
51 | 57 | import org.elasticsearch.xpack.core.watcher.execution.WatchExecutionContext;
|
52 | 58 | import org.elasticsearch.xpack.core.watcher.execution.WatchExecutionSnapshot;
|
53 | 59 | import org.elasticsearch.xpack.core.watcher.execution.Wid;
|
|
91 | 97 | import static java.util.Collections.singletonMap;
|
92 | 98 | import static org.elasticsearch.common.unit.TimeValue.timeValueSeconds;
|
93 | 99 | import static org.elasticsearch.index.seqno.SequenceNumbers.UNASSIGNED_SEQ_NO;
|
| 100 | +import static org.hamcrest.Matchers.containsString; |
94 | 101 | import static org.hamcrest.Matchers.equalTo;
|
95 | 102 | import static org.hamcrest.Matchers.greaterThan;
|
96 | 103 | import static org.hamcrest.Matchers.hasSize;
|
@@ -844,22 +851,74 @@ public void testThatTriggeredWatchDeletionWorksOnExecutionRejection() throws Exc
|
844 | 851 | when(getResponse.isExists()).thenReturn(true);
|
845 | 852 | when(getResponse.getId()).thenReturn("foo");
|
846 | 853 | mockGetWatchResponse(client, "foo", getResponse);
|
| 854 | + ActionFuture actionFuture = mock(ActionFuture.class); |
| 855 | + when(actionFuture.get()).thenReturn(""); |
| 856 | + when(client.index(any())).thenReturn(actionFuture); |
| 857 | + when(client.delete(any())).thenReturn(actionFuture); |
| 858 | + |
847 | 859 | when(parser.parseWithSecrets(eq("foo"), eq(true), any(), any(), any(), anyLong(), anyLong())).thenReturn(watch);
|
848 | 860 |
|
849 |
| - // execute needs to fail as well as storing the history |
| 861 | + // execute needs to fail |
850 | 862 | doThrow(new EsRejectedExecutionException()).when(executor).execute(any());
|
851 |
| - doThrow(new ElasticsearchException("whatever")).when(historyStore).forcePut(any()); |
852 | 863 |
|
853 | 864 | Wid wid = new Wid(watch.id(), ZonedDateTime.now(ZoneOffset.UTC));
|
854 | 865 |
|
855 | 866 | TriggeredWatch triggeredWatch = new TriggeredWatch(wid,
|
856 | 867 | new ScheduleTriggerEvent(ZonedDateTime.now(ZoneOffset.UTC) ,ZonedDateTime.now(ZoneOffset.UTC)));
|
857 | 868 | executionService.executeTriggeredWatches(Collections.singleton(triggeredWatch));
|
858 | 869 |
|
859 |
| - verify(triggeredWatchStore, times(1)).delete(wid); |
860 |
| - ArgumentCaptor<WatchRecord> captor = ArgumentCaptor.forClass(WatchRecord.class); |
861 |
| - verify(historyStore, times(1)).forcePut(captor.capture()); |
862 |
| - assertThat(captor.getValue().state(), is(ExecutionState.THREADPOOL_REJECTION)); |
| 870 | + ArgumentCaptor<DeleteRequest> deleteCaptor = ArgumentCaptor.forClass(DeleteRequest.class); |
| 871 | + verify(client).delete(deleteCaptor.capture()); |
| 872 | + assertThat(deleteCaptor.getValue().index(), equalTo(TriggeredWatchStoreField.INDEX_NAME)); |
| 873 | + assertThat(deleteCaptor.getValue().id(), equalTo(wid.value())); |
| 874 | + |
| 875 | + ArgumentCaptor<IndexRequest> watchHistoryCaptor = ArgumentCaptor.forClass(IndexRequest.class); |
| 876 | + verify(client).index(watchHistoryCaptor.capture()); |
| 877 | + |
| 878 | + assertThat(watchHistoryCaptor.getValue().source().utf8ToString(), containsString(ExecutionState.THREADPOOL_REJECTION.toString())); |
| 879 | + assertThat(watchHistoryCaptor.getValue().index(), containsString(".watcher-history")); |
| 880 | + } |
| 881 | + |
| 882 | + public void testForcePutHistoryOnExecutionRejection() throws Exception { |
| 883 | + Watch watch = mock(Watch.class); |
| 884 | + when(watch.id()).thenReturn("foo"); |
| 885 | + WatchStatus status = new WatchStatus(ZonedDateTime.now(ZoneOffset.UTC), Collections.emptyMap()); |
| 886 | + when(watch.status()).thenReturn(status); |
| 887 | + GetResponse getResponse = mock(GetResponse.class); |
| 888 | + when(getResponse.isExists()).thenReturn(true); |
| 889 | + when(getResponse.getId()).thenReturn("foo"); |
| 890 | + mockGetWatchResponse(client, "foo", getResponse); |
| 891 | + ActionFuture actionFuture = mock(ActionFuture.class); |
| 892 | + when(actionFuture.get()).thenReturn(""); |
| 893 | + when(client.index(any())) |
| 894 | + .thenThrow(new VersionConflictEngineException( |
| 895 | + new ShardId(new Index("mockindex", "mockuuid"), 0), "id", "explaination")) |
| 896 | + .thenReturn(actionFuture); |
| 897 | + when(client.delete(any())).thenReturn(actionFuture); |
| 898 | + |
| 899 | + when(parser.parseWithSecrets(eq("foo"), eq(true), any(), any(), any(), anyLong(), anyLong())).thenReturn(watch); |
| 900 | + |
| 901 | + // execute needs to fail |
| 902 | + doThrow(new EsRejectedExecutionException()).when(executor).execute(any()); |
| 903 | + |
| 904 | + Wid wid = new Wid(watch.id(), ZonedDateTime.now(ZoneOffset.UTC)); |
| 905 | + |
| 906 | + TriggeredWatch triggeredWatch = new TriggeredWatch(wid, |
| 907 | + new ScheduleTriggerEvent(ZonedDateTime.now(ZoneOffset.UTC), ZonedDateTime.now(ZoneOffset.UTC))); |
| 908 | + executionService.executeTriggeredWatches(Collections.singleton(triggeredWatch)); |
| 909 | + |
| 910 | + ArgumentCaptor<DeleteRequest> deleteCaptor = ArgumentCaptor.forClass(DeleteRequest.class); |
| 911 | + verify(client).delete(deleteCaptor.capture()); |
| 912 | + assertThat(deleteCaptor.getValue().index(), equalTo(TriggeredWatchStoreField.INDEX_NAME)); |
| 913 | + assertThat(deleteCaptor.getValue().id(), equalTo(wid.value())); |
| 914 | + |
| 915 | + ArgumentCaptor<IndexRequest> watchHistoryCaptor = ArgumentCaptor.forClass(IndexRequest.class); |
| 916 | + verify(client, times(2)).index(watchHistoryCaptor.capture()); |
| 917 | + List<IndexRequest> indexRequests = watchHistoryCaptor.getAllValues(); |
| 918 | + |
| 919 | + assertThat(indexRequests.get(0).id(), equalTo(indexRequests.get(1).id())); |
| 920 | + assertThat(indexRequests.get(0).source().utf8ToString(), containsString(ExecutionState.THREADPOOL_REJECTION.toString())); |
| 921 | + assertThat(indexRequests.get(1).source().utf8ToString(), containsString(ExecutionState.EXECUTED_MULTIPLE_TIMES.toString())); |
863 | 922 | }
|
864 | 923 |
|
865 | 924 | public void testThatTriggeredWatchDeletionHappensOnlyIfWatchExists() throws Exception {
|
@@ -898,7 +957,7 @@ public void testThatTriggeredWatchDeletionHappensOnlyIfWatchExists() throws Exce
|
898 | 957 | when(watch.status()).thenReturn(watchStatus);
|
899 | 958 |
|
900 | 959 | executionService.execute(context);
|
901 |
| - verify(triggeredWatchStore, never()).delete(any()); |
| 960 | + verify(client, never()).delete(any()); |
902 | 961 | }
|
903 | 962 |
|
904 | 963 | public void testThatSingleWatchCannotBeExecutedConcurrently() throws Exception {
|
|
0 commit comments