Skip to content

Commit 1de2a92

Browse files
authored
Watcher: Ensure that execution triggers properly on initial setup (#33360)
This commit reverts most of #33157 as it introduces another race condition and breaks a common case of watcher, when the first watch is added to the system and the index does not exist yet. This means, that the index will be created, which triggers a reload, but during this time the put watch operation that triggered this is not yet indexed, so that both processes finish roughly add the same time and should not overwrite each other but act complementary. This commit reverts the logic of cleaning out the ticker engine watches on start up, as this is done already when the execution is paused - which also gets paused on the cluster state listener again, as we can be sure here, that the watches index has not yet been created. This also adds a new test, that starts a one node cluster and emulates the case of a non existing watches index and a watch being added, which should result in proper execution. Closes #33320
1 parent 9c03168 commit 1de2a92

File tree

7 files changed

+147
-104
lines changed

7 files changed

+147
-104
lines changed

test/framework/src/main/java/org/elasticsearch/test/rest/ESRestTestCase.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -635,7 +635,7 @@ private static boolean isXPackTemplate(String name) {
635635
if (name.startsWith(".monitoring-")) {
636636
return true;
637637
}
638-
if (name.startsWith(".watch-history-")) {
638+
if (name.startsWith(".watch") || name.startsWith(".triggered_watches")) {
639639
return true;
640640
}
641641
if (name.startsWith(".ml-")) {

x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/WatcherService.java

Lines changed: 10 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -27,12 +27,12 @@
2727
import org.elasticsearch.common.unit.TimeValue;
2828
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
2929
import org.elasticsearch.common.util.concurrent.EsExecutors;
30-
import org.elasticsearch.common.util.concurrent.ThreadContext;
3130
import org.elasticsearch.common.xcontent.XContentType;
3231
import org.elasticsearch.search.SearchHit;
3332
import org.elasticsearch.search.builder.SearchSourceBuilder;
3433
import org.elasticsearch.search.sort.SortBuilders;
3534
import org.elasticsearch.threadpool.ThreadPool;
35+
import org.elasticsearch.xpack.core.ClientHelper;
3636
import org.elasticsearch.xpack.core.upgrade.UpgradeField;
3737
import org.elasticsearch.xpack.core.watcher.execution.TriggeredWatchStoreField;
3838
import org.elasticsearch.xpack.core.watcher.watch.Watch;
@@ -63,7 +63,6 @@
6363
import static org.elasticsearch.cluster.routing.ShardRoutingState.STARTED;
6464
import static org.elasticsearch.common.util.concurrent.EsExecutors.daemonThreadFactory;
6565
import static org.elasticsearch.xpack.core.ClientHelper.WATCHER_ORIGIN;
66-
import static org.elasticsearch.xpack.core.ClientHelper.stashWithOrigin;
6766
import static org.elasticsearch.xpack.core.watcher.support.Exceptions.illegalState;
6867
import static org.elasticsearch.xpack.core.watcher.watch.Watch.INDEX;
6968

@@ -92,7 +91,7 @@ public class WatcherService extends AbstractComponent {
9291
this.scrollSize = settings.getAsInt("xpack.watcher.watch.scroll.size", 100);
9392
this.defaultSearchTimeout = settings.getAsTime("xpack.watcher.internal.ops.search.default_timeout", TimeValue.timeValueSeconds(30));
9493
this.parser = parser;
95-
this.client = client;
94+
this.client = ClientHelper.clientWithOrigin(client, WATCHER_ORIGIN);
9695
this.executor = executor;
9796
}
9897

@@ -184,6 +183,10 @@ void reload(ClusterState state, String reason) {
184183
// changes
185184
processedClusterStateVersion.set(state.getVersion());
186185

186+
triggerService.pauseExecution();
187+
int cancelledTaskCount = executionService.clearExecutionsAndQueue();
188+
logger.info("reloading watcher, reason [{}], cancelled [{}] queued tasks", reason, cancelledTaskCount);
189+
187190
executor.execute(wrapWatcherService(() -> reloadInner(state, reason, false),
188191
e -> logger.error("error reloading watcher", e)));
189192
}
@@ -232,10 +235,6 @@ private synchronized boolean reloadInner(ClusterState state, String reason, bool
232235
// also this is the place where we pause the trigger service execution and clear the current execution service, so that we make sure
233236
// that existing executions finish, but no new ones are executed
234237
if (processedClusterStateVersion.get() == state.getVersion()) {
235-
triggerService.pauseExecution();
236-
int cancelledTaskCount = executionService.clearExecutionsAndQueue();
237-
logger.info("reloading watcher, reason [{}], cancelled [{}] queued tasks", reason, cancelledTaskCount);
238-
239238
executionService.unPause();
240239
triggerService.start(watches);
241240
if (triggeredWatches.isEmpty() == false) {
@@ -273,7 +272,7 @@ private Collection<Watch> loadWatches(ClusterState clusterState) {
273272

274273
SearchResponse response = null;
275274
List<Watch> watches = new ArrayList<>();
276-
try (ThreadContext.StoredContext ignore = stashWithOrigin(client.threadPool().getThreadContext(), WATCHER_ORIGIN)) {
275+
try {
277276
RefreshResponse refreshResponse = client.admin().indices().refresh(new RefreshRequest(INDEX))
278277
.actionGet(TimeValue.timeValueSeconds(5));
279278
if (refreshResponse.getSuccessfulShards() < indexMetaData.getNumberOfShards()) {
@@ -357,11 +356,9 @@ private Collection<Watch> loadWatches(ClusterState clusterState) {
357356
}
358357
} finally {
359358
if (response != null) {
360-
try (ThreadContext.StoredContext ignore = stashWithOrigin(client.threadPool().getThreadContext(), WATCHER_ORIGIN)) {
361-
ClearScrollRequest clearScrollRequest = new ClearScrollRequest();
362-
clearScrollRequest.addScrollId(response.getScrollId());
363-
client.clearScroll(clearScrollRequest).actionGet(scrollTimeout);
364-
}
359+
ClearScrollRequest clearScrollRequest = new ClearScrollRequest();
360+
clearScrollRequest.addScrollId(response.getScrollId());
361+
client.clearScroll(clearScrollRequest).actionGet(scrollTimeout);
365362
}
366363
}
367364

x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/trigger/schedule/engine/TickerScheduleTriggerEngine.java

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import java.time.Clock;
2323
import java.util.ArrayList;
2424
import java.util.Collection;
25+
import java.util.HashMap;
2526
import java.util.List;
2627
import java.util.Map;
2728
import java.util.concurrent.ConcurrentHashMap;
@@ -49,14 +50,23 @@ public TickerScheduleTriggerEngine(Settings settings, ScheduleRegistry scheduleR
4950
@Override
5051
public synchronized void start(Collection<Watch> jobs) {
5152
long startTime = clock.millis();
52-
Map<String, ActiveSchedule> schedules = new ConcurrentHashMap<>();
53+
Map<String, ActiveSchedule> schedules = new HashMap<>(jobs.size());
5354
for (Watch job : jobs) {
5455
if (job.trigger() instanceof ScheduleTrigger) {
5556
ScheduleTrigger trigger = (ScheduleTrigger) job.trigger();
5657
schedules.put(job.id(), new ActiveSchedule(job.id(), trigger.getSchedule(), startTime));
5758
}
5859
}
59-
this.schedules = schedules;
60+
// why are we calling putAll() here instead of assigning a brand
61+
// new concurrent hash map you may ask yourself over here
62+
// This requires some explanation how TriggerEngine.start() is
63+
// invoked, when a reload due to the cluster state listener is done
64+
// If the watches index does not exist, and new document is stored,
65+
// then the creation of that index will trigger a reload which calls
66+
// this method. The index operation however will run at the same time
67+
// as the reload, so if we clean out the old data structure here,
68+
// that can lead to that one watch not being triggered
69+
this.schedules.putAll(schedules);
6070
}
6171

6272
@Override

x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/WatcherServiceTests.java

Lines changed: 42 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -6,19 +6,21 @@
66
package org.elasticsearch.xpack.watcher;
77

88
import org.elasticsearch.Version;
9+
import org.elasticsearch.action.ActionListener;
10+
import org.elasticsearch.action.admin.indices.refresh.RefreshAction;
911
import org.elasticsearch.action.admin.indices.refresh.RefreshRequest;
1012
import org.elasticsearch.action.admin.indices.refresh.RefreshResponse;
13+
import org.elasticsearch.action.search.ClearScrollAction;
1114
import org.elasticsearch.action.search.ClearScrollRequest;
1215
import org.elasticsearch.action.search.ClearScrollResponse;
16+
import org.elasticsearch.action.search.SearchAction;
1317
import org.elasticsearch.action.search.SearchRequest;
1418
import org.elasticsearch.action.search.SearchResponse;
1519
import org.elasticsearch.action.search.SearchResponseSections;
20+
import org.elasticsearch.action.search.SearchScrollAction;
1621
import org.elasticsearch.action.search.SearchScrollRequest;
1722
import org.elasticsearch.action.search.ShardSearchFailure;
18-
import org.elasticsearch.action.support.PlainActionFuture;
19-
import org.elasticsearch.client.AdminClient;
2023
import org.elasticsearch.client.Client;
21-
import org.elasticsearch.client.IndicesAdminClient;
2224
import org.elasticsearch.cluster.ClusterName;
2325
import org.elasticsearch.cluster.ClusterState;
2426
import org.elasticsearch.cluster.metadata.IndexMetaData;
@@ -42,7 +44,6 @@
4244
import org.elasticsearch.search.SearchShardTarget;
4345
import org.elasticsearch.test.ESTestCase;
4446
import org.elasticsearch.threadpool.ThreadPool;
45-
import org.elasticsearch.xpack.core.XPackSettings;
4647
import org.elasticsearch.xpack.core.watcher.trigger.Trigger;
4748
import org.elasticsearch.xpack.core.watcher.watch.Watch;
4849
import org.elasticsearch.xpack.core.watcher.watch.WatchStatus;
@@ -55,6 +56,7 @@
5556
import org.elasticsearch.xpack.watcher.watch.WatchParser;
5657
import org.joda.time.DateTime;
5758
import org.joda.time.DateTimeZone;
59+
import org.junit.Before;
5860
import org.mockito.ArgumentCaptor;
5961

6062
import java.util.Collections;
@@ -67,6 +69,7 @@
6769
import static org.hamcrest.Matchers.is;
6870
import static org.mockito.Matchers.any;
6971
import static org.mockito.Matchers.eq;
72+
import static org.mockito.Mockito.doAnswer;
7073
import static org.mockito.Mockito.mock;
7174
import static org.mockito.Mockito.never;
7275
import static org.mockito.Mockito.verify;
@@ -76,14 +79,24 @@ public class WatcherServiceTests extends ESTestCase {
7679

7780
private final ExecutorService executorService = EsExecutors.newDirectExecutorService();
7881

82+
private final Client client = mock(Client.class);
83+
84+
@Before
85+
public void configureMockClient() {
86+
when(client.settings()).thenReturn(Settings.EMPTY);
87+
ThreadPool threadPool = mock(ThreadPool.class);
88+
when(client.threadPool()).thenReturn(threadPool);
89+
when(threadPool.getThreadContext()).thenReturn(new ThreadContext(Settings.EMPTY));
90+
}
91+
7992
public void testValidateStartWithClosedIndex() {
8093
TriggerService triggerService = mock(TriggerService.class);
8194
TriggeredWatchStore triggeredWatchStore = mock(TriggeredWatchStore.class);
8295
ExecutionService executionService = mock(ExecutionService.class);
8396
WatchParser parser = mock(WatchParser.class);
8497

8598
WatcherService service = new WatcherService(Settings.EMPTY, triggerService, triggeredWatchStore,
86-
executionService, parser, mock(Client.class), executorService) {
99+
executionService, parser, client, executorService) {
87100
@Override
88101
void stopExecutor() {
89102
}
@@ -102,18 +115,11 @@ void stopExecutor() {
102115
}
103116

104117
public void testLoadOnlyActiveWatches() throws Exception {
105-
// this is just, so we dont have to add any mocking to the threadpool
106-
Settings settings = Settings.builder().put(XPackSettings.SECURITY_ENABLED.getKey(), false).build();
107-
108118
TriggerService triggerService = mock(TriggerService.class);
109119
TriggeredWatchStore triggeredWatchStore = mock(TriggeredWatchStore.class);
110120
ExecutionService executionService = mock(ExecutionService.class);
111121
WatchParser parser = mock(WatchParser.class);
112-
Client client = mock(Client.class);
113-
ThreadPool threadPool = mock(ThreadPool.class);
114-
when(client.threadPool()).thenReturn(threadPool);
115-
when(threadPool.getThreadContext()).thenReturn(new ThreadContext(Settings.EMPTY));
116-
WatcherService service = new WatcherService(settings, triggerService, triggeredWatchStore,
122+
WatcherService service = new WatcherService(Settings.EMPTY, triggerService, triggeredWatchStore,
117123
executionService, parser, client, executorService) {
118124
@Override
119125
void stopExecutor() {
@@ -150,21 +156,21 @@ void stopExecutor() {
150156
RefreshResponse refreshResponse = mock(RefreshResponse.class);
151157
when(refreshResponse.getSuccessfulShards())
152158
.thenReturn(clusterState.getMetaData().getIndices().get(Watch.INDEX).getNumberOfShards());
153-
AdminClient adminClient = mock(AdminClient.class);
154-
IndicesAdminClient indicesAdminClient = mock(IndicesAdminClient.class);
155-
when(client.admin()).thenReturn(adminClient);
156-
when(adminClient.indices()).thenReturn(indicesAdminClient);
157-
PlainActionFuture<RefreshResponse> refreshFuture = new PlainActionFuture<>();
158-
when(indicesAdminClient.refresh(any(RefreshRequest.class))).thenReturn(refreshFuture);
159-
refreshFuture.onResponse(refreshResponse);
159+
doAnswer(invocation -> {
160+
ActionListener<RefreshResponse> listener = (ActionListener<RefreshResponse>) invocation.getArguments()[2];
161+
listener.onResponse(refreshResponse);
162+
return null;
163+
}).when(client).execute(eq(RefreshAction.INSTANCE), any(RefreshRequest.class), any(ActionListener.class));
160164

161165
// empty scroll response, no further scrolling needed
162166
SearchResponseSections scrollSearchSections = new SearchResponseSections(SearchHits.empty(), null, null, false, false, null, 1);
163167
SearchResponse scrollSearchResponse = new SearchResponse(scrollSearchSections, "scrollId", 1, 1, 0, 10,
164168
ShardSearchFailure.EMPTY_ARRAY, SearchResponse.Clusters.EMPTY);
165-
PlainActionFuture<SearchResponse> searchScrollResponseFuture = new PlainActionFuture<>();
166-
when(client.searchScroll(any(SearchScrollRequest.class))).thenReturn(searchScrollResponseFuture);
167-
searchScrollResponseFuture.onResponse(scrollSearchResponse);
169+
doAnswer(invocation -> {
170+
ActionListener<SearchResponse> listener = (ActionListener<SearchResponse>) invocation.getArguments()[2];
171+
listener.onResponse(scrollSearchResponse);
172+
return null;
173+
}).when(client).execute(eq(SearchScrollAction.INSTANCE), any(SearchScrollRequest.class), any(ActionListener.class));
168174

169175
// one search response containing active and inactive watches
170176
int count = randomIntBetween(2, 200);
@@ -192,13 +198,17 @@ void stopExecutor() {
192198
SearchResponseSections sections = new SearchResponseSections(searchHits, null, null, false, false, null, 1);
193199
SearchResponse searchResponse = new SearchResponse(sections, "scrollId", 1, 1, 0, 10, ShardSearchFailure.EMPTY_ARRAY,
194200
SearchResponse.Clusters.EMPTY);
195-
PlainActionFuture<SearchResponse> searchResponseFuture = new PlainActionFuture<>();
196-
when(client.search(any(SearchRequest.class))).thenReturn(searchResponseFuture);
197-
searchResponseFuture.onResponse(searchResponse);
198-
199-
PlainActionFuture<ClearScrollResponse> clearScrollFuture = new PlainActionFuture<>();
200-
when(client.clearScroll(any(ClearScrollRequest.class))).thenReturn(clearScrollFuture);
201-
clearScrollFuture.onResponse(new ClearScrollResponse(true, 1));
201+
doAnswer(invocation -> {
202+
ActionListener<SearchResponse> listener = (ActionListener<SearchResponse>) invocation.getArguments()[2];
203+
listener.onResponse(searchResponse);
204+
return null;
205+
}).when(client).execute(eq(SearchAction.INSTANCE), any(SearchRequest.class), any(ActionListener.class));
206+
207+
doAnswer(invocation -> {
208+
ActionListener<ClearScrollResponse> listener = (ActionListener<ClearScrollResponse>) invocation.getArguments()[2];
209+
listener.onResponse(new ClearScrollResponse(true, 1));
210+
return null;
211+
}).when(client).execute(eq(ClearScrollAction.INSTANCE), any(ClearScrollRequest.class), any(ActionListener.class));
202212

203213
service.start(clusterState, () -> {});
204214

@@ -228,7 +238,7 @@ public void testPausingWatcherServiceAlsoPausesTriggerService() {
228238
assertThat(triggerService.count(), is(1L));
229239

230240
WatcherService service = new WatcherService(Settings.EMPTY, triggerService, mock(TriggeredWatchStore.class),
231-
mock(ExecutionService.class), mock(WatchParser.class), mock(Client.class), executorService) {
241+
mock(ExecutionService.class), mock(WatchParser.class), client, executorService) {
232242
@Override
233243
void stopExecutor() {
234244
}
@@ -245,7 +255,7 @@ public void testReloadingWatcherDoesNotPauseExecutionService() {
245255
ExecutionService executionService = mock(ExecutionService.class);
246256
TriggerService triggerService = mock(TriggerService.class);
247257
WatcherService service = new WatcherService(Settings.EMPTY, triggerService, mock(TriggeredWatchStore.class),
248-
executionService, mock(WatchParser.class), mock(Client.class), executorService) {
258+
executionService, mock(WatchParser.class), client, executorService) {
249259
@Override
250260
void stopExecutor() {
251261
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,66 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License;
4+
* you may not use this file except in compliance with the Elastic License.
5+
*/
6+
package org.elasticsearch.xpack.watcher.test.integration;
7+
8+
import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse;
9+
import org.elasticsearch.action.search.SearchResponse;
10+
import org.elasticsearch.cluster.metadata.IndexMetaData;
11+
import org.elasticsearch.protocol.xpack.watcher.PutWatchResponse;
12+
import org.elasticsearch.test.ESIntegTestCase.ClusterScope;
13+
import org.elasticsearch.xpack.core.watcher.watch.Watch;
14+
import org.elasticsearch.xpack.watcher.test.AbstractWatcherIntegrationTestCase;
15+
import org.elasticsearch.xpack.watcher.trigger.schedule.IntervalSchedule;
16+
import org.elasticsearch.xpack.watcher.watch.WatchStoreUtils;
17+
18+
import java.util.concurrent.TimeUnit;
19+
20+
import static org.elasticsearch.test.ESIntegTestCase.Scope.SUITE;
21+
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
22+
import static org.elasticsearch.xpack.watcher.actions.ActionBuilders.loggingAction;
23+
import static org.elasticsearch.xpack.watcher.client.WatchSourceBuilders.watchBuilder;
24+
import static org.elasticsearch.xpack.watcher.input.InputBuilders.simpleInput;
25+
import static org.elasticsearch.xpack.watcher.trigger.TriggerBuilders.schedule;
26+
import static org.elasticsearch.xpack.watcher.trigger.schedule.Schedules.interval;
27+
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
28+
import static org.hamcrest.Matchers.is;
29+
30+
@ClusterScope(scope = SUITE, numClientNodes = 0, transportClientRatio = 0, maxNumDataNodes = 1, supportsDedicatedMasters = false)
31+
public class SingleNodeTests extends AbstractWatcherIntegrationTestCase {
32+
33+
@Override
34+
protected boolean timeWarped() {
35+
return false;
36+
}
37+
38+
// this is the standard setup when starting watcher in a regular cluster
39+
// the index does not exist, a watch gets added
40+
// the watch should be executed properly, despite the index being created and the cluster state listener being reloaded
41+
public void testThatLoadingWithNonExistingIndexWorks() throws Exception {
42+
stopWatcher();
43+
ClusterStateResponse clusterStateResponse = client().admin().cluster().prepareState().get();
44+
IndexMetaData metaData = WatchStoreUtils.getConcreteIndex(Watch.INDEX, clusterStateResponse.getState().metaData());
45+
String watchIndexName = metaData.getIndex().getName();
46+
assertAcked(client().admin().indices().prepareDelete(watchIndexName));
47+
startWatcher();
48+
49+
String watchId = randomAlphaOfLength(20);
50+
// now we start with an empty set up, store a watch and expected it to be executed
51+
PutWatchResponse putWatchResponse = watcherClient().preparePutWatch(watchId)
52+
.setSource(watchBuilder()
53+
.trigger(schedule(interval(1, IntervalSchedule.Interval.Unit.SECONDS)))
54+
.input(simpleInput())
55+
.addAction("_logger", loggingAction("logging of watch _name")))
56+
.get();
57+
assertThat(putWatchResponse.isCreated(), is(true));
58+
59+
assertBusy(() -> {
60+
client().admin().indices().prepareRefresh(".watcher-history*");
61+
SearchResponse searchResponse = client().prepareSearch(".watcher-history*").setSize(0).get();
62+
assertThat(searchResponse.getHits().getTotalHits(), is(greaterThanOrEqualTo(1L)));
63+
}, 5, TimeUnit.SECONDS);
64+
}
65+
66+
}

0 commit comments

Comments
 (0)