Skip to content

Commit e1119ad

Browse files
authored
Improve Watcher test framework resiliency (#40658)
It is possible for the watches tracked by ScheduleTriggerEngineMock to get out of sync with the Watches in the ScheduleTriggerEngine production code, which can lead to watches failing to run. This commit: 1. Changes TimeWarp to try to run the watch on all schedulers, rather than stopping after one which claims to have the watch registered. This reduces the impact of desynchronization between the mocking code and the backing production code. 2. Makes ScheduleTriggerEngineMock respect pauses of execution again. This is necessary to prevent duplicate watch invocations due to the above change. 3. Tweaks how watches are registered in ScheduleTriggerEngineMock to prevent race conditions due to concurrent modification. 4. Tweaks WatcherConcreteIndexTests to use TimeWarp instead of waiting for watches to be triggered, as TimeWarp is more reliable and accomplishes the same goal.
1 parent bba79b4 commit e1119ad

10 files changed

+37
-33
lines changed

x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/WatcherConcreteIndexTests.java

+4-5
Original file line numberDiff line numberDiff line change
@@ -23,18 +23,14 @@
2323

2424
public class WatcherConcreteIndexTests extends AbstractWatcherIntegrationTestCase {
2525

26-
@Override
27-
protected boolean timeWarped() {
28-
return false;
29-
}
30-
3126
public void testCanUseAnyConcreteIndexName() throws Exception {
3227
String newWatcherIndexName = randomAlphaOfLength(10).toLowerCase(Locale.ROOT);
3328
String watchResultsIndex = randomAlphaOfLength(11).toLowerCase(Locale.ROOT);
3429
createIndex(watchResultsIndex);
3530

3631
stopWatcher();
3732
replaceWatcherIndexWithRandomlyNamedIndex(Watch.INDEX, newWatcherIndexName, Watch.DOC_TYPE);
33+
ensureGreen(newWatcherIndexName);
3834
startWatcher();
3935

4036
PutWatchResponse putWatchResponse = watcherClient().preparePutWatch("mywatch").setSource(watchBuilder()
@@ -45,6 +41,9 @@ public void testCanUseAnyConcreteIndexName() throws Exception {
4541
.get();
4642

4743
assertTrue(putWatchResponse.isCreated());
44+
refresh();
45+
46+
timeWarp().trigger("mywatch");
4847

4948
assertBusy(() -> {
5049
SearchResponse searchResult = client().prepareSearch(watchResultsIndex).setTrackTotalHits(true).get();

x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/actions/webhook/WebhookHttpsIntegrationTests.java

+1-2
Original file line numberDiff line numberDiff line change
@@ -15,10 +15,10 @@
1515
import org.elasticsearch.xpack.core.watcher.history.WatchRecord;
1616
import org.elasticsearch.xpack.core.watcher.support.xcontent.XContentSource;
1717
import org.elasticsearch.xpack.watcher.actions.ActionBuilders;
18+
import org.elasticsearch.xpack.watcher.common.http.BasicAuth;
1819
import org.elasticsearch.xpack.watcher.common.http.HttpMethod;
1920
import org.elasticsearch.xpack.watcher.common.http.HttpRequestTemplate;
2021
import org.elasticsearch.xpack.watcher.common.http.Scheme;
21-
import org.elasticsearch.xpack.watcher.common.http.BasicAuth;
2222
import org.elasticsearch.xpack.watcher.common.text.TextTemplate;
2323
import org.elasticsearch.xpack.watcher.condition.InternalAlwaysCondition;
2424
import org.elasticsearch.xpack.watcher.test.AbstractWatcherIntegrationTestCase;
@@ -67,7 +67,6 @@ public void stopWebservice() throws Exception {
6767
webServer.close();
6868
}
6969

70-
@AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/35503")
7170
public void testHttps() throws Exception {
7271
webServer.enqueue(new MockResponse().setResponseCode(200).setBody("body"));
7372
HttpRequestTemplate.Builder builder = HttpRequestTemplate.builder("localhost", webServer.getPort())

x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/actions/webhook/WebhookIntegrationTests.java

+1-3
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,6 @@
55
*/
66
package org.elasticsearch.xpack.watcher.actions.webhook;
77

8-
import org.apache.lucene.util.LuceneTestCase;
98
import org.elasticsearch.action.get.GetResponse;
109
import org.elasticsearch.action.search.SearchResponse;
1110
import org.elasticsearch.common.settings.Settings;
@@ -19,9 +18,9 @@
1918
import org.elasticsearch.xpack.core.watcher.history.WatchRecord;
2019
import org.elasticsearch.xpack.core.watcher.support.xcontent.XContentSource;
2120
import org.elasticsearch.xpack.watcher.actions.ActionBuilders;
21+
import org.elasticsearch.xpack.watcher.common.http.BasicAuth;
2222
import org.elasticsearch.xpack.watcher.common.http.HttpMethod;
2323
import org.elasticsearch.xpack.watcher.common.http.HttpRequestTemplate;
24-
import org.elasticsearch.xpack.watcher.common.http.BasicAuth;
2524
import org.elasticsearch.xpack.watcher.common.text.TextTemplate;
2625
import org.elasticsearch.xpack.watcher.condition.InternalAlwaysCondition;
2726
import org.elasticsearch.xpack.watcher.test.AbstractWatcherIntegrationTestCase;
@@ -45,7 +44,6 @@
4544
import static org.hamcrest.Matchers.is;
4645
import static org.hamcrest.Matchers.notNullValue;
4746

48-
@LuceneTestCase.AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/35503")
4947
public class WebhookIntegrationTests extends AbstractWatcherIntegrationTestCase {
5048

5149
private MockWebServer webServer = new MockWebServer();

x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/input/http/HttpInputIntegrationTests.java

-1
Original file line numberDiff line numberDiff line change
@@ -81,7 +81,6 @@ public void testHttpInput() throws Exception {
8181
assertWatchWithMinimumPerformedActionsCount("_name", 1, false);
8282
}
8383

84-
@AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/40682")
8584
public void testHttpInputClusterStats() throws Exception {
8685
InetSocketAddress address = internalCluster().httpAddresses()[0];
8786
PutWatchResponse putWatchResponse = watcherClient().preparePutWatch("_name")

x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/test/AbstractWatcherIntegrationTestCase.java

+9-4
Original file line numberDiff line numberDiff line change
@@ -73,8 +73,8 @@
7373
import java.util.Collections;
7474
import java.util.HashSet;
7575
import java.util.List;
76-
import java.util.Map;
7776
import java.util.Locale;
77+
import java.util.Map;
7878
import java.util.Set;
7979
import java.util.concurrent.TimeUnit;
8080
import java.util.concurrent.atomic.AtomicReference;
@@ -611,9 +611,14 @@ public ClockMock clock() {
611611
}
612612

613613
public void trigger(String watchId, int times, TimeValue timeValue) {
614-
boolean isTriggered = schedulers.stream().anyMatch(scheduler -> scheduler.trigger(watchId, times, timeValue));
615-
String msg = String.format(Locale.ROOT, "could not find watch [%s] to trigger", watchId);
616-
assertThat(msg, isTriggered, is(true));
614+
long triggeredCount = schedulers.stream()
615+
.filter(scheduler -> scheduler.trigger(watchId, times, timeValue))
616+
.count();
617+
String msg = String.format(Locale.ROOT, "watch was triggered on [%d] schedulers, expected [1]", triggeredCount);
618+
if (triggeredCount > 1) {
619+
logger.warn(msg);
620+
}
621+
assertThat(msg, triggeredCount, greaterThanOrEqualTo(1L));
617622
}
618623
}
619624

x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/test/integration/BasicWatcherTests.java

-2
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,6 @@
55
*/
66
package org.elasticsearch.xpack.watcher.test.integration;
77

8-
import org.apache.lucene.util.LuceneTestCase;
98
import org.elasticsearch.ElasticsearchParseException;
109
import org.elasticsearch.action.search.SearchResponse;
1110
import org.elasticsearch.action.search.SearchType;
@@ -63,7 +62,6 @@
6362

6463
@TestLogging("org.elasticsearch.xpack.watcher:DEBUG," +
6564
"org.elasticsearch.xpack.watcher.WatcherIndexingListener:TRACE")
66-
@LuceneTestCase.AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/35503")
6765
public class BasicWatcherTests extends AbstractWatcherIntegrationTestCase {
6866

6967
public void testIndexWatch() throws Exception {

x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/test/integration/HttpSecretsIntegrationTests.java

-1
Original file line numberDiff line numberDiff line change
@@ -87,7 +87,6 @@ protected Settings nodeSettings(int nodeOrdinal) {
8787
return super.nodeSettings(nodeOrdinal);
8888
}
8989

90-
@AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/40587")
9190
public void testHttpInput() throws Exception {
9291
WatcherClient watcherClient = watcherClient();
9392
watcherClient.preparePutWatch("_id")

x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/test/integration/WatchAckTests.java

-1
Original file line numberDiff line numberDiff line change
@@ -122,7 +122,6 @@ public void testAckSingleAction() throws Exception {
122122
assertThat(throttledCount, greaterThan(0L));
123123
}
124124

125-
@AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/35506")
126125
public void testAckAllActions() throws Exception {
127126
PutWatchResponse putWatchResponse = watcherClient().preparePutWatch()
128127
.setId("_id")

x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/test/integration/WatchMetadataTests.java

-1
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,6 @@
3838

3939
public class WatchMetadataTests extends AbstractWatcherIntegrationTestCase {
4040

41-
@AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/40631")
4241
public void testWatchMetadata() throws Exception {
4342
Map<String, Object> metadata = new HashMap<>();
4443
metadata.put("foo", "bar");

x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/trigger/ScheduleTriggerEngineMock.java

+22-13
Original file line numberDiff line numberDiff line change
@@ -21,8 +21,10 @@
2121
import java.time.Clock;
2222
import java.util.Collection;
2323
import java.util.Collections;
24+
import java.util.Map;
2425
import java.util.concurrent.ConcurrentHashMap;
25-
import java.util.concurrent.ConcurrentMap;
26+
import java.util.concurrent.atomic.AtomicBoolean;
27+
import java.util.concurrent.atomic.AtomicReference;
2628

2729
/**
2830
* A mock scheduler to help with unit testing. Provide {@link ScheduleTriggerEngineMock#trigger} method to manually trigger
@@ -31,7 +33,8 @@
3133
public class ScheduleTriggerEngineMock extends ScheduleTriggerEngine {
3234
private static final Logger logger = LogManager.getLogger(ScheduleTriggerEngineMock.class);
3335

34-
private final ConcurrentMap<String, Watch> watches = new ConcurrentHashMap<>();
36+
private final AtomicReference<Map<String, Watch>> watches = new AtomicReference<>(new ConcurrentHashMap<>());
37+
private final AtomicBoolean paused = new AtomicBoolean(false);
3538

3639
public ScheduleTriggerEngineMock(ScheduleRegistry scheduleRegistry, Clock clock) {
3740
super(scheduleRegistry, clock);
@@ -49,38 +52,44 @@ public ScheduleTriggerEvent parseTriggerEvent(TriggerService service, String wat
4952
}
5053

5154
@Override
52-
public void start(Collection<Watch> jobs) {
53-
jobs.forEach(this::add);
55+
public synchronized void start(Collection<Watch> jobs) {
56+
Map<String, Watch> newWatches = new ConcurrentHashMap<>();
57+
jobs.forEach((watch) -> newWatches.put(watch.id(), watch));
58+
watches.set(newWatches);
59+
paused.set(false);
5460
}
5561

5662
@Override
5763
public void stop() {
58-
watches.clear();
64+
watches.set(new ConcurrentHashMap<>());
5965
}
6066

6167
@Override
62-
public void add(Watch watch) {
68+
public synchronized void add(Watch watch) {
6369
logger.debug("adding watch [{}]", watch.id());
64-
watches.put(watch.id(), watch);
70+
watches.get().put(watch.id(), watch);
6571
}
6672

6773
@Override
6874
public void pauseExecution() {
69-
// No action is needed because this engine does not trigger watches on a schedule (instead
70-
// they must be triggered manually).
75+
paused.set(true);
7176
}
7277

7378
@Override
74-
public boolean remove(String jobId) {
75-
return watches.remove(jobId) != null;
79+
public synchronized boolean remove(String jobId) {
80+
return watches.get().remove(jobId) != null;
7681
}
7782

7883
public boolean trigger(String jobName) {
7984
return trigger(jobName, 1, null);
8085
}
8186

8287
public boolean trigger(String jobName, int times, TimeValue interval) {
83-
if (watches.containsKey(jobName) == false) {
88+
if (watches.get().containsKey(jobName) == false) {
89+
return false;
90+
}
91+
if (paused.get()) {
92+
logger.info("not executing watch [{}] on this scheduler because it is paused", jobName);
8493
return false;
8594
}
8695

@@ -89,7 +98,7 @@ public boolean trigger(String jobName, int times, TimeValue interval) {
8998
logger.debug("firing watch [{}] at [{}]", jobName, now);
9099
ScheduleTriggerEvent event = new ScheduleTriggerEvent(jobName, now, now);
91100
consumers.forEach(consumer -> consumer.accept(Collections.singletonList(event)));
92-
if (interval != null) {
101+
if (interval != null) {
93102
if (clock instanceof ClockMock) {
94103
((ClockMock) clock).fastForward(interval);
95104
} else {

0 commit comments

Comments
 (0)