Skip to content

Commit d79cd8c

Browse files
authored
Use any index specified by .watches for Watcher (#39541)
Previously, Watcher only attached its listener to indices that started with the prefix `.watches`, which causes Watcher to silently fail to schedule newly created Watches if the `.watches` alias is redirected to an index that does not start with `.watches`. Watcher now attaches the listener to all indices, so that Watcher can respond to changes in which index has the `.watches` alias. Also adjusts the tests to randomly use non-prefixed concrete indices for .watches and .triggered_watches.
1 parent ff6ffe8 commit d79cd8c

File tree

3 files changed

+111
-12
lines changed

3 files changed

+111
-12
lines changed

x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/Watcher.java

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -574,11 +574,9 @@ public void onIndexModule(IndexModule module) {
574574
}
575575

576576
assert listener != null;
577-
// for now, we only add this index operation listener to indices starting with .watches
578-
// this also means, that aliases pointing to this index have to follow this notation
579-
if (module.getIndex().getName().startsWith(Watch.INDEX)) {
580-
module.addIndexOperationListener(listener);
581-
}
577+
// Attach a listener to every index so that we can react to alias changes.
578+
// This listener will be a no-op except on the index pointed to by .watches
579+
module.addIndexOperationListener(listener);
582580
}
583581

584582
static void validAutoCreateIndex(Settings settings, Logger logger) {
Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,54 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License;
4+
* you may not use this file except in compliance with the Elastic License.
5+
*/
6+
7+
package org.elasticsearch.xpack.watcher;
8+
9+
import org.elasticsearch.action.search.SearchResponse;
10+
import org.elasticsearch.protocol.xpack.watcher.PutWatchResponse;
11+
import org.elasticsearch.xpack.core.watcher.watch.Watch;
12+
import org.elasticsearch.xpack.watcher.condition.InternalAlwaysCondition;
13+
import org.elasticsearch.xpack.watcher.test.AbstractWatcherIntegrationTestCase;
14+
15+
import java.util.Locale;
16+
17+
import static org.elasticsearch.xpack.watcher.actions.ActionBuilders.indexAction;
18+
import static org.elasticsearch.xpack.watcher.client.WatchSourceBuilders.watchBuilder;
19+
import static org.elasticsearch.xpack.watcher.input.InputBuilders.noneInput;
20+
import static org.elasticsearch.xpack.watcher.trigger.TriggerBuilders.schedule;
21+
import static org.elasticsearch.xpack.watcher.trigger.schedule.Schedules.interval;
22+
import static org.hamcrest.Matchers.greaterThan;
23+
24+
public class WatcherConcreteIndexTests extends AbstractWatcherIntegrationTestCase {
25+
26+
@Override
27+
protected boolean timeWarped() {
28+
return false;
29+
}
30+
31+
public void testCanUseAnyConcreteIndexName() throws Exception {
32+
String newWatcherIndexName = randomAlphaOfLength(10).toLowerCase(Locale.ROOT);
33+
String watchResultsIndex = randomAlphaOfLength(11).toLowerCase(Locale.ROOT);
34+
createIndex(watchResultsIndex);
35+
36+
stopWatcher();
37+
replaceWatcherIndexWithRandomlyNamedIndex(Watch.INDEX, newWatcherIndexName, Watch.DOC_TYPE);
38+
startWatcher();
39+
40+
PutWatchResponse putWatchResponse = watcherClient().preparePutWatch("mywatch").setSource(watchBuilder()
41+
.trigger(schedule(interval("3s")))
42+
.input(noneInput())
43+
.condition(InternalAlwaysCondition.INSTANCE)
44+
.addAction("indexer", indexAction(watchResultsIndex, "_doc")))
45+
.get();
46+
47+
assertTrue(putWatchResponse.isCreated());
48+
49+
assertBusy(() -> {
50+
SearchResponse searchResult = client().prepareSearch(watchResultsIndex).setTrackTotalHits(true).get();
51+
assertThat((int) searchResult.getHits().getTotalHits().value, greaterThan(0));
52+
});
53+
}
54+
}

x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/test/AbstractWatcherIntegrationTestCase.java

Lines changed: 54 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -6,14 +6,17 @@
66
package org.elasticsearch.xpack.watcher.test;
77

88
import org.elasticsearch.action.admin.indices.alias.Alias;
9+
import org.elasticsearch.action.admin.indices.alias.get.GetAliasesResponse;
910
import org.elasticsearch.action.admin.indices.create.CreateIndexResponse;
11+
import org.elasticsearch.action.admin.indices.get.GetIndexResponse;
1012
import org.elasticsearch.action.admin.indices.template.get.GetIndexTemplatesResponse;
1113
import org.elasticsearch.action.search.SearchRequestBuilder;
1214
import org.elasticsearch.action.search.SearchResponse;
1315
import org.elasticsearch.action.support.IndicesOptions;
1416
import org.elasticsearch.analysis.common.CommonAnalysisPlugin;
1517
import org.elasticsearch.cluster.ClusterState;
1618
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
19+
import org.elasticsearch.cluster.metadata.MappingMetaData;
1720
import org.elasticsearch.cluster.routing.IndexRoutingTable;
1821
import org.elasticsearch.common.collect.Tuple;
1922
import org.elasticsearch.common.network.NetworkModule;
@@ -194,7 +197,7 @@ public void _setup() throws Exception {
194197
internalCluster().setDisruptionScheme(ice);
195198
ice.startDisrupting();
196199
}
197-
200+
stopWatcher();
198201
createWatcherIndicesOrAliases();
199202
startWatcher();
200203
}
@@ -221,13 +224,19 @@ private void createWatcherIndicesOrAliases() throws Exception {
221224
// alias for .watches, setting the index template to the same as well
222225
String watchIndexName;
223226
String triggeredWatchIndexName;
224-
if (rarely()) {
225-
watchIndexName = ".watches-alias-index";
226-
CreateIndexResponse response = client().admin().indices().prepareCreate(watchIndexName)
227+
if (randomBoolean()) {
228+
// Create an index to get the template
229+
String tempIndex = ".watches" + randomAlphaOfLength(5).toLowerCase(Locale.ROOT);
230+
CreateIndexResponse response = client().admin().indices().prepareCreate(tempIndex)
227231
.setCause("Index to test aliases with .watches index")
228232
.addAlias(new Alias(Watch.INDEX))
229233
.get();
230234
assertAcked(response);
235+
236+
// Now replace it with a randomly named index
237+
watchIndexName = randomAlphaOfLengthBetween(5,10).toLowerCase(Locale.ROOT);
238+
replaceWatcherIndexWithRandomlyNamedIndex(Watch.INDEX, watchIndexName, Watch.DOC_TYPE);
239+
231240
logger.info("set alias for .watches index to [{}]", watchIndexName);
232241
} else {
233242
watchIndexName = Watch.INDEX;
@@ -239,13 +248,19 @@ private void createWatcherIndicesOrAliases() throws Exception {
239248
}
240249

241250
// alias for .triggered-watches, ensuring the index template is set appropriately
242-
if (rarely()) {
243-
triggeredWatchIndexName = ".triggered_watches-alias-index";
244-
CreateIndexResponse response = client().admin().indices().prepareCreate(triggeredWatchIndexName)
251+
if (randomBoolean()) {
252+
String tempIndex = ".triggered_watches-alias-index";
253+
CreateIndexResponse response = client().admin().indices().prepareCreate(tempIndex)
245254
.setCause("Index to test aliases with .triggered-watches index")
246255
.addAlias(new Alias(TriggeredWatchStoreField.INDEX_NAME))
247256
.get();
248257
assertAcked(response);
258+
259+
// Now replace it with a randomly-named index
260+
triggeredWatchIndexName = randomValueOtherThan(watchIndexName,
261+
() -> randomAlphaOfLengthBetween(5,10).toLowerCase(Locale.ROOT));
262+
replaceWatcherIndexWithRandomlyNamedIndex(TriggeredWatchStoreField.INDEX_NAME, triggeredWatchIndexName,
263+
TriggeredWatchStoreField.DOC_TYPE);
249264
logger.info("set alias for .triggered-watches index to [{}]", triggeredWatchIndexName);
250265
} else {
251266
triggeredWatchIndexName = TriggeredWatchStoreField.INDEX_NAME;
@@ -259,6 +274,38 @@ private void createWatcherIndicesOrAliases() throws Exception {
259274
}
260275
}
261276

277+
public void replaceWatcherIndexWithRandomlyNamedIndex(String originalIndexOrAlias, String to, String docType) {
278+
GetIndexResponse index = client().admin().indices().prepareGetIndex().setIndices(originalIndexOrAlias).get();
279+
MappingMetaData mapping = index.getMappings().get(index.getIndices()[0]).get(docType);
280+
281+
Settings settings = index.getSettings().get(index.getIndices()[0]);
282+
Settings.Builder newSettings = Settings.builder().put(settings);
283+
newSettings.remove("index.provided_name");
284+
newSettings.remove("index.uuid");
285+
newSettings.remove("index.creation_date");
286+
newSettings.remove("index.version.created");
287+
288+
CreateIndexResponse createIndexResponse = client().admin().indices().prepareCreate(to)
289+
.addMapping(docType, mapping.sourceAsMap())
290+
.setSettings(newSettings)
291+
.get();
292+
assertTrue(createIndexResponse.isAcknowledged());
293+
ensureGreen(to);
294+
295+
AtomicReference<String> originalIndex = new AtomicReference<>(originalIndexOrAlias);
296+
boolean watchesIsAlias = client().admin().indices().prepareAliasesExist(originalIndexOrAlias).get().isExists();
297+
if (watchesIsAlias) {
298+
GetAliasesResponse aliasesResponse = client().admin().indices().prepareGetAliases(originalIndexOrAlias).get();
299+
assertEquals(1, aliasesResponse.getAliases().size());
300+
aliasesResponse.getAliases().forEach((aliasRecord) -> {
301+
assertEquals(1, aliasRecord.value.size());
302+
originalIndex.set(aliasRecord.key);
303+
});
304+
}
305+
client().admin().indices().prepareDelete(originalIndex.get()).get();
306+
client().admin().indices().prepareAliases().addAlias(to, originalIndexOrAlias).get();
307+
}
308+
262309
protected TimeWarp timeWarp() {
263310
assert timeWarped() : "cannot access TimeWarp when test context is not time warped";
264311
return timeWarp;

0 commit comments

Comments
 (0)