Skip to content

Commit fd67002

Browse files
committed
Ensure that .watcher-history-11* template is in installed prior to use
[WatcherIndexTemplateRegistry](https://github.com/elastic/elasticsearch/blob/7.7/x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/support/WatcherIndexTemplateRegistry.java#L74) as of elastic#52962 requires all nodes to be on 7.7.0 before it allows the version 11 index template to be installed. While in a mixed cluster, nothing prevents Watcher from running on the new host before the all of the nodes are on 7.7.0. This will result in the .watcher-history-11* index without the proper mappings. Without the proper mapping a single document (for a large watch) can exceed the default 1000 field limit and cause error to show in the logs. This commit ensures the same logic for writing to the index is applied as for installing the template. In a mixed cluster, the `10` index template will continue to be written. Only once all of nodes are on 7.7.0+ will the `11` index template be installed and used. Note - this PR targets 7.x and will be back ported to 7.7.next. This conditional installation of the template is not present in master. closes elastic#56732
1 parent 7c8860b commit fd67002

File tree

11 files changed

+105
-46
lines changed

11 files changed

+105
-46
lines changed

x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/template/IndexTemplateRegistry.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -154,7 +154,7 @@ private void addTemplatesIfMissing(ClusterState state) {
154154
if (creationCheck.compareAndSet(false, true)) {
155155
IndexTemplateMetadata currentTemplate = state.metadata().getTemplates().get(templateName);
156156
if (Objects.isNull(currentTemplate)) {
157-
logger.debug("adding index template [{}] for [{}], because it doesn't exist", templateName, getOrigin());
157+
logger.info("adding index template [{}] for [{}], because it doesn't exist", templateName, getOrigin());
158158
putTemplate(newTemplate, creationCheck);
159159
} else if (Objects.isNull(currentTemplate.getVersion()) || newTemplate.getVersion() > currentTemplate.getVersion()) {
160160
// IndexTemplateConfig now enforces templates contain a `version` property, so if the template doesn't have one we can

x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/watcher/history/HistoryStoreField.java

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,8 @@
55
*/
66
package org.elasticsearch.xpack.core.watcher.history;
77

8+
import org.elasticsearch.Version;
9+
import org.elasticsearch.cluster.ClusterState;
810
import org.elasticsearch.common.time.DateFormatter;
911
import org.elasticsearch.xpack.core.watcher.support.WatcherIndexTemplateRegistryField;
1012

@@ -14,12 +16,19 @@ public final class HistoryStoreField {
1416

1517
public static final String INDEX_PREFIX = ".watcher-history-";
1618
public static final String INDEX_PREFIX_WITH_TEMPLATE = INDEX_PREFIX + WatcherIndexTemplateRegistryField.INDEX_TEMPLATE_VERSION + "-";
19+
public static final String INDEX_PREFIX_WITH_TEMPLATE_10 = INDEX_PREFIX +
20+
WatcherIndexTemplateRegistryField.INDEX_TEMPLATE_VERSION_10 + "-";
1721
private static final DateFormatter indexTimeFormat = DateFormatter.forPattern("yyyy.MM.dd");
1822

1923
/**
2024
* Calculates the correct history index name for a given time
2125
*/
22-
public static String getHistoryIndexNameForTime(ZonedDateTime time) {
23-
return INDEX_PREFIX_WITH_TEMPLATE + indexTimeFormat.format(time);
26+
public static String getHistoryIndexNameForTime(ZonedDateTime time, ClusterState state) {
27+
//null only allowed for testing
28+
if (state == null || state.nodes().getMinNodeVersion().onOrAfter(Version.V_7_7_0)) {
29+
return INDEX_PREFIX_WITH_TEMPLATE + indexTimeFormat.format(time);
30+
} else {
31+
return INDEX_PREFIX_WITH_TEMPLATE_10 + indexTimeFormat.format(time);
32+
}
2433
}
2534
}

x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/watcher/support/WatcherIndexTemplateRegistryField.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,8 +18,9 @@ public final class WatcherIndexTemplateRegistryField {
1818
// version 11: watch history indices are hidden
1919
// Note: if you change this, also inform the kibana team around the watcher-ui
2020
public static final int INDEX_TEMPLATE_VERSION = 11;
21+
public static final int INDEX_TEMPLATE_VERSION_10 = 10;
2122
public static final String HISTORY_TEMPLATE_NAME = ".watch-history-" + INDEX_TEMPLATE_VERSION;
22-
public static final String HISTORY_TEMPLATE_NAME_10 = ".watch-history-10";
23+
public static final String HISTORY_TEMPLATE_NAME_10 = ".watch-history-" + INDEX_TEMPLATE_VERSION_10;
2324
public static final String HISTORY_TEMPLATE_NAME_NO_ILM = ".watch-history-no-ilm-" + INDEX_TEMPLATE_VERSION;
2425
public static final String HISTORY_TEMPLATE_NAME_NO_ILM_10 = ".watch-history-no-ilm-10";
2526
public static final String TRIGGERED_TEMPLATE_NAME = ".triggered_watches";

x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/security/authz/store/ReservedRolesStoreTests.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1395,7 +1395,7 @@ public void testWatcherAdminRole() {
13951395
assertThat(role.indices().allowedIndicesMatcher(IndexAction.NAME).test("foo"), is(false));
13961396

13971397
ZonedDateTime now = ZonedDateTime.now(ZoneOffset.UTC);
1398-
String historyIndex = HistoryStoreField.getHistoryIndexNameForTime(now);
1398+
String historyIndex = HistoryStoreField.getHistoryIndexNameForTime(now, null);
13991399
for (String index : new String[]{ Watch.INDEX, historyIndex, TriggeredWatchStoreField.INDEX_NAME }) {
14001400
assertOnlyReadAllowed(role, index);
14011401
}
@@ -1429,7 +1429,7 @@ public void testWatcherUserRole() {
14291429
assertThat(role.indices().allowedIndicesMatcher(IndexAction.NAME).test(TriggeredWatchStoreField.INDEX_NAME), is(false));
14301430

14311431
ZonedDateTime now = ZonedDateTime.now(ZoneOffset.UTC);
1432-
String historyIndex = HistoryStoreField.getHistoryIndexNameForTime(now);
1432+
String historyIndex = HistoryStoreField.getHistoryIndexNameForTime(now, null);
14331433
for (String index : new String[]{ Watch.INDEX, historyIndex }) {
14341434
assertOnlyReadAllowed(role, index);
14351435
}

x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/Watcher.java

Lines changed: 14 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -241,10 +241,6 @@ public Watcher(final Settings settings) {
241241
this.settings = settings;
242242
this.transportClient = XPackPlugin.transportClientMode(settings);
243243
this.enabled = XPackSettings.WATCHER_ENABLED.get(settings);
244-
245-
if (enabled && transportClient == false) {
246-
validAutoCreateIndex(settings, logger);
247-
}
248244
}
249245

250246
// overridable by tests
@@ -259,6 +255,10 @@ public Collection<Object> createComponents(Client client, ClusterService cluster
259255
NodeEnvironment nodeEnvironment, NamedWriteableRegistry namedWriteableRegistry,
260256
IndexNameExpressionResolver expressionResolver,
261257
Supplier<RepositoriesService> repositoriesServiceSupplier) {
258+
if (enabled && transportClient == false) {
259+
validAutoCreateIndex(settings, logger, clusterService);
260+
}
261+
262262
if (enabled == false) {
263263
return Collections.emptyList();
264264
}
@@ -386,7 +386,7 @@ public void afterBulk(long executionId, BulkRequest request, Throwable failure)
386386
.setConcurrentRequests(SETTING_BULK_CONCURRENT_REQUESTS.get(settings))
387387
.build();
388388

389-
HistoryStore historyStore = new HistoryStore(bulkProcessor);
389+
HistoryStore historyStore = new HistoryStore(bulkProcessor, clusterService);
390390

391391
// schedulers
392392
final Set<Schedule.Parser> scheduleParsers = new HashSet<>();
@@ -601,7 +601,7 @@ public void onIndexModule(IndexModule module) {
601601
module.addIndexOperationListener(listener);
602602
}
603603

604-
static void validAutoCreateIndex(Settings settings, Logger logger) {
604+
static void validAutoCreateIndex(Settings settings, Logger logger, ClusterService clusterService) {
605605
String value = settings.get("action.auto_create_index");
606606
if (value == null) {
607607
return;
@@ -623,14 +623,14 @@ static void validAutoCreateIndex(Settings settings, Logger logger) {
623623
indices.add(".watches");
624624
indices.add(".triggered_watches");
625625
ZonedDateTime now = ZonedDateTime.now(ZoneOffset.UTC);
626-
indices.add(HistoryStoreField.getHistoryIndexNameForTime(now));
627-
indices.add(HistoryStoreField.getHistoryIndexNameForTime(now.plusDays(1)));
628-
indices.add(HistoryStoreField.getHistoryIndexNameForTime(now.plusMonths(1)));
629-
indices.add(HistoryStoreField.getHistoryIndexNameForTime(now.plusMonths(2)));
630-
indices.add(HistoryStoreField.getHistoryIndexNameForTime(now.plusMonths(3)));
631-
indices.add(HistoryStoreField.getHistoryIndexNameForTime(now.plusMonths(4)));
632-
indices.add(HistoryStoreField.getHistoryIndexNameForTime(now.plusMonths(5)));
633-
indices.add(HistoryStoreField.getHistoryIndexNameForTime(now.plusMonths(6)));
626+
indices.add(HistoryStoreField.getHistoryIndexNameForTime(now, clusterService.state()));
627+
indices.add(HistoryStoreField.getHistoryIndexNameForTime(now.plusDays(1), clusterService.state()));
628+
indices.add(HistoryStoreField.getHistoryIndexNameForTime(now.plusMonths(1), clusterService.state()));
629+
indices.add(HistoryStoreField.getHistoryIndexNameForTime(now.plusMonths(2), clusterService.state()));
630+
indices.add(HistoryStoreField.getHistoryIndexNameForTime(now.plusMonths(3), clusterService.state()));
631+
indices.add(HistoryStoreField.getHistoryIndexNameForTime(now.plusMonths(4), clusterService.state()));
632+
indices.add(HistoryStoreField.getHistoryIndexNameForTime(now.plusMonths(5), clusterService.state()));
633+
indices.add(HistoryStoreField.getHistoryIndexNameForTime(now.plusMonths(6), clusterService.state()));
634634
for (String index : indices) {
635635
boolean matched = false;
636636
for (String match : matches) {

x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/execution/ExecutionService.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -452,7 +452,7 @@ private void executeAsync(WatchExecutionContext ctx, final TriggeredWatch trigge
452452
* Any existing watchRecord will be overwritten.
453453
*/
454454
private void forcePutHistory(WatchRecord watchRecord) {
455-
String index = HistoryStoreField.getHistoryIndexNameForTime(watchRecord.triggerEvent().triggeredTime());
455+
String index = HistoryStoreField.getHistoryIndexNameForTime(watchRecord.triggerEvent().triggeredTime(), clusterService.state());
456456
try {
457457
try (XContentBuilder builder = XContentFactory.jsonBuilder();
458458
ThreadContext.StoredContext ignore = client.threadPool().getThreadContext().stashWithOrigin(WATCHER_ORIGIN)) {

x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/history/HistoryStore.java

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
import org.elasticsearch.action.index.IndexRequest;
1414
import org.elasticsearch.cluster.ClusterState;
1515
import org.elasticsearch.cluster.metadata.IndexMetadata;
16+
import org.elasticsearch.cluster.service.ClusterService;
1617
import org.elasticsearch.common.xcontent.XContentBuilder;
1718
import org.elasticsearch.common.xcontent.XContentFactory;
1819
import org.elasticsearch.xpack.core.watcher.history.HistoryStoreField;
@@ -31,17 +32,19 @@ public class HistoryStore {
3132
private static final Logger logger = LogManager.getLogger(HistoryStore.class);
3233

3334
private final BulkProcessor bulkProcessor;
35+
private final ClusterService clusterService;
3436

35-
public HistoryStore(BulkProcessor bulkProcessor) {
37+
public HistoryStore(BulkProcessor bulkProcessor, ClusterService clusterService) {
3638
this.bulkProcessor = bulkProcessor;
39+
this.clusterService = clusterService;
3740
}
3841

3942
/**
4043
* Stores the specified watchRecord.
4144
* If the specified watchRecord already was stored this call will fail with a version conflict.
4245
*/
4346
public void put(WatchRecord watchRecord) throws Exception {
44-
String index = HistoryStoreField.getHistoryIndexNameForTime(watchRecord.triggerEvent().triggeredTime());
47+
String index = HistoryStoreField.getHistoryIndexNameForTime(watchRecord.triggerEvent().triggeredTime(), clusterService.state());
4548
try (XContentBuilder builder = XContentFactory.jsonBuilder()) {
4649
watchRecord.toXContent(builder, WatcherParams.HIDE_SECRETS);
4750

@@ -58,7 +61,7 @@ public void put(WatchRecord watchRecord) throws Exception {
5861
* Any existing watchRecord will be overwritten.
5962
*/
6063
public void forcePut(WatchRecord watchRecord) {
61-
String index = HistoryStoreField.getHistoryIndexNameForTime(watchRecord.triggerEvent().triggeredTime());
64+
String index = HistoryStoreField.getHistoryIndexNameForTime(watchRecord.triggerEvent().triggeredTime(), clusterService.state());
6265
try (XContentBuilder builder = XContentFactory.jsonBuilder()) {
6366
watchRecord.toXContent(builder, WatcherParams.HIDE_SECRETS);
6467

@@ -78,7 +81,7 @@ public void forcePut(WatchRecord watchRecord) {
7881
* @return true, if history store is ready to be started
7982
*/
8083
public static boolean validate(ClusterState state) {
81-
String currentIndex = HistoryStoreField.getHistoryIndexNameForTime(ZonedDateTime.now(ZoneOffset.UTC));
84+
String currentIndex = HistoryStoreField.getHistoryIndexNameForTime(ZonedDateTime.now(ZoneOffset.UTC), state);
8285
IndexMetadata indexMetadata = WatchStoreUtils.getConcreteIndex(currentIndex, state.metadata());
8386
return indexMetadata == null || (indexMetadata.getState() == IndexMetadata.State.OPEN &&
8487
state.routingTable().index(indexMetadata.getIndex()).allPrimaryShardsActive());

x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/WatcherPluginTests.java

Lines changed: 34 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,11 @@
55
*/
66
package org.elasticsearch.xpack.watcher;
77

8+
import org.elasticsearch.Version;
9+
import org.elasticsearch.cluster.ClusterState;
810
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
11+
import org.elasticsearch.cluster.node.DiscoveryNodes;
12+
import org.elasticsearch.cluster.service.ClusterService;
913
import org.elasticsearch.common.settings.Setting;
1014
import org.elasticsearch.common.settings.Settings;
1115
import org.elasticsearch.env.TestEnvironment;
@@ -19,7 +23,9 @@
1923
import org.elasticsearch.xpack.core.XPackSettings;
2024
import org.elasticsearch.xpack.core.watcher.watch.Watch;
2125
import org.elasticsearch.xpack.watcher.notification.NotificationService;
26+
import org.junit.Before;
2227

28+
import java.util.Arrays;
2329
import java.util.Collections;
2430
import java.util.List;
2531

@@ -31,33 +37,52 @@
3137
import static org.mockito.Mockito.times;
3238
import static org.mockito.Mockito.verify;
3339
import static org.mockito.Mockito.verifyNoMoreInteractions;
40+
import static org.mockito.Mockito.when;
3441

3542
public class WatcherPluginTests extends ESTestCase {
3643

44+
private ClusterService clusterService;
45+
private ClusterState clusterState;
46+
private DiscoveryNodes discoveryNodes;
47+
48+
@Before
49+
public void init() {
50+
clusterService = mock(ClusterService.class);
51+
clusterState = mock(ClusterState.class);
52+
discoveryNodes = mock(DiscoveryNodes.class);
53+
when(clusterService.state()).thenReturn(clusterState);
54+
when(clusterState.nodes()).thenReturn(discoveryNodes);
55+
when(discoveryNodes.getMinNodeVersion()).thenReturn(randomFrom(Arrays.asList(Version.V_7_0_0, Version.V_7_7_0)));
56+
57+
}
58+
3759
public void testValidAutoCreateIndex() {
38-
Watcher.validAutoCreateIndex(Settings.EMPTY, logger);
39-
Watcher.validAutoCreateIndex(Settings.builder().put("action.auto_create_index", true).build(), logger);
60+
Watcher.validAutoCreateIndex(Settings.EMPTY, logger, clusterService);
61+
Watcher.validAutoCreateIndex(Settings.builder().put("action.auto_create_index", true).build(), logger, clusterService);
4062

4163
IllegalArgumentException exception = expectThrows(IllegalArgumentException.class,
42-
() -> Watcher.validAutoCreateIndex(Settings.builder().put("action.auto_create_index", false).build(), logger));
64+
() -> Watcher.validAutoCreateIndex(Settings.builder().put("action.auto_create_index", false).build(), logger,
65+
clusterService));
4366
assertThat(exception.getMessage(), containsString("[.watches,.triggered_watches,.watcher-history-*]"));
4467

4568
Watcher.validAutoCreateIndex(Settings.builder().put("action.auto_create_index",
46-
".watches,.triggered_watches,.watcher-history*").build(), logger);
47-
Watcher.validAutoCreateIndex(Settings.builder().put("action.auto_create_index", "*w*").build(), logger);
48-
Watcher.validAutoCreateIndex(Settings.builder().put("action.auto_create_index", ".w*,.t*").build(), logger);
69+
".watches,.triggered_watches,.watcher-history*").build(), logger, clusterService);
70+
Watcher.validAutoCreateIndex(Settings.builder().put("action.auto_create_index", "*w*").build(), logger, clusterService);
71+
Watcher.validAutoCreateIndex(Settings.builder().put("action.auto_create_index", ".w*,.t*").build(), logger, clusterService);
4972

5073
exception = expectThrows(IllegalArgumentException.class,
51-
() -> Watcher.validAutoCreateIndex(Settings.builder().put("action.auto_create_index", ".watches").build(), logger));
74+
() -> Watcher.validAutoCreateIndex(Settings.builder().put("action.auto_create_index", ".watches").build(), logger,
75+
clusterService));
5276
assertThat(exception.getMessage(), containsString("[.watches,.triggered_watches,.watcher-history-*]"));
5377

5478
exception = expectThrows(IllegalArgumentException.class,
55-
() -> Watcher.validAutoCreateIndex(Settings.builder().put("action.auto_create_index", ".triggered_watch").build(), logger));
79+
() -> Watcher.validAutoCreateIndex(Settings.builder().put("action.auto_create_index", ".triggered_watch").build(), logger,
80+
clusterService));
5681
assertThat(exception.getMessage(), containsString("[.watches,.triggered_watches,.watcher-history-*]"));
5782

5883
exception = expectThrows(IllegalArgumentException.class,
5984
() -> Watcher.validAutoCreateIndex(Settings.builder().put("action.auto_create_index", ".watcher-history-*").build(),
60-
logger));
85+
logger, clusterService));
6186
assertThat(exception.getMessage(), containsString("[.watches,.triggered_watches,.watcher-history-*]"));
6287
}
6388

@@ -130,7 +155,6 @@ public void testReload() {
130155
.build();
131156
NotificationService mockService = mock(NotificationService.class);
132157
Watcher watcher = new TestWatcher(settings, mockService);
133-
134158
watcher.reload(settings);
135159
verify(mockService, times(1)).reload(settings);
136160
}

0 commit comments

Comments
 (0)