Skip to content

Commit 213e06a

Browse files
authored
[Transform] Make sure remote clusters are at least 7.12 if transform uses search runtime fields (#69170) (#69510)
1 parent e891d5e commit 213e06a

File tree

10 files changed

+341
-9
lines changed

10 files changed

+341
-9
lines changed

x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/common/validation/SourceDestValidator.java

Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77

88
package org.elasticsearch.xpack.core.common.validation;
99

10+
import org.elasticsearch.Version;
1011
import org.elasticsearch.action.ActionListener;
1112
import org.elasticsearch.action.ActionRequest;
1213
import org.elasticsearch.action.ActionRequestValidationException;
@@ -33,11 +34,16 @@
3334
import java.util.Arrays;
3435
import java.util.List;
3536
import java.util.Locale;
37+
import java.util.Map;
3638
import java.util.Set;
3739
import java.util.SortedSet;
3840
import java.util.TreeSet;
3941
import java.util.stream.Collectors;
4042

43+
import static java.util.Map.Entry.comparingByKey;
44+
import static java.util.function.Function.identity;
45+
import static java.util.stream.Collectors.joining;
46+
import static java.util.stream.Collectors.toMap;
4147
import static org.elasticsearch.action.ValidateActions.addValidationError;
4248
import static org.elasticsearch.cluster.metadata.MetadataCreateIndexService.validateIndexOrAliasName;
4349

@@ -64,6 +70,8 @@ public final class SourceDestValidator {
6470
public static final String REMOTE_CLUSTER_LICENSE_INACTIVE = "License check failed for remote cluster "
6571
+ "alias [{0}], license is not active";
6672
public static final String REMOTE_SOURCE_INDICES_NOT_SUPPORTED = "remote source indices are not supported";
73+
public static final String REMOTE_CLUSTERS_TOO_OLD =
74+
"remote clusters are expected to run at least version [{0}] (reason: [{1}]), but the following clusters were too old: [{2}]";
6775
public static final String PIPELINE_MISSING = "Pipeline with id [{0}] could not be found";
6876

6977
// workaround for 7.x: remoteClusterAliases does not throw
@@ -216,6 +224,11 @@ public Set<String> getRegisteredRemoteClusterNames() {
216224
return remoteClusterService.getRegisteredRemoteClusterNames();
217225
}
218226

227+
// convenience method to make testing easier
228+
public Version getRemoteClusterVersion(String cluster) {
229+
return remoteClusterService.getConnection(cluster).getVersion();
230+
}
231+
219232
private void resolveLocalAndRemoteSource() {
220233
resolvedSource = new TreeSet<>(Arrays.asList(source));
221234
resolvedRemoteSource = new TreeSet<>(RemoteClusterLicenseChecker.remoteIndices(resolvedSource));
@@ -425,6 +438,59 @@ public void validate(Context context, ActionListener<Context> listener) {
425438
}
426439
}
427440

441+
public static class RemoteClusterMinimumVersionValidation implements SourceDestValidation {
442+
443+
private final Version minExpectedVersion;
444+
private final String reason;
445+
446+
public RemoteClusterMinimumVersionValidation(Version minExpectedVersion, String reason) {
447+
this.minExpectedVersion = minExpectedVersion;
448+
this.reason = reason;
449+
}
450+
451+
public Version getMinExpectedVersion() {
452+
return minExpectedVersion;
453+
}
454+
455+
public String getReason() {
456+
return reason;
457+
}
458+
459+
@Override
460+
public void validate(Context context, ActionListener<Context> listener) {
461+
List<String> remoteIndices = new ArrayList<>(context.resolveRemoteSource());
462+
Map<String, Version> remoteClusterVersions;
463+
try {
464+
List<String> remoteAliases =
465+
RemoteClusterLicenseChecker.remoteClusterAliases(context.getRegisteredRemoteClusterNames(), remoteIndices);
466+
remoteClusterVersions = remoteAliases.stream().collect(toMap(identity(), context::getRemoteClusterVersion));
467+
} catch (NoSuchRemoteClusterException e) {
468+
context.addValidationError(e.getMessage());
469+
listener.onResponse(context);
470+
return;
471+
} catch (Exception e) {
472+
context.addValidationError(ERROR_REMOTE_CLUSTER_SEARCH, e.getMessage());
473+
listener.onResponse(context);
474+
return;
475+
}
476+
Map<String, Version> oldRemoteClusterVersions =
477+
remoteClusterVersions.entrySet().stream()
478+
.filter(entry -> entry.getValue().before(minExpectedVersion))
479+
.collect(toMap(Map.Entry::getKey, Map.Entry::getValue));
480+
if (oldRemoteClusterVersions.isEmpty() == false) {
481+
context.addValidationError(
482+
REMOTE_CLUSTERS_TOO_OLD,
483+
minExpectedVersion,
484+
reason,
485+
oldRemoteClusterVersions.entrySet().stream()
486+
.sorted(comparingByKey()) // sort to have a deterministic order among clusters in the resulting string
487+
.map(e -> e.getKey() + " (" + e.getValue() + ")")
488+
.collect(joining(", ")));
489+
}
490+
listener.onResponse(context);
491+
}
492+
}
493+
428494
static class DestinationInSourceValidation implements SourceDestValidation {
429495

430496
@Override

x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/transforms/TransformConfig.java

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,8 @@
2323
import org.elasticsearch.common.xcontent.XContentBuilder;
2424
import org.elasticsearch.common.xcontent.XContentParser;
2525
import org.elasticsearch.xpack.core.common.time.TimeUtils;
26+
import org.elasticsearch.xpack.core.common.validation.SourceDestValidator;
27+
import org.elasticsearch.xpack.core.common.validation.SourceDestValidator.SourceDestValidation;
2628
import org.elasticsearch.xpack.core.transform.TransformField;
2729
import org.elasticsearch.xpack.core.transform.TransformMessages;
2830
import org.elasticsearch.xpack.core.transform.transforms.latest.LatestConfig;
@@ -32,6 +34,7 @@
3234
import java.io.IOException;
3335
import java.time.Instant;
3436
import java.util.Collections;
37+
import java.util.List;
3538
import java.util.Map;
3639
import java.util.Objects;
3740

@@ -45,6 +48,8 @@ public class TransformConfig extends AbstractDiffable<TransformConfig> implement
4548

4649
public static final String NAME = "data_frame_transform_config";
4750
public static final ParseField HEADERS = new ParseField("headers");
51+
/** Version in which {@code FieldCapabilitiesRequest.runtime_fields} field was introduced. */
52+
private static final Version FIELD_CAPS_RUNTIME_MAPPINGS_INTRODUCED_VERSION = Version.V_7_12_0;
4853

4954
// types of transforms
5055
public static final ParseField PIVOT_TRANSFORM = new ParseField("pivot");
@@ -308,6 +313,22 @@ public RetentionPolicyConfig getRetentionPolicyConfig() {
308313
return retentionPolicyConfig;
309314
}
310315

316+
/**
317+
* Determines the minimum version of a cluster in multi-cluster setup that is needed to successfully run this transform config.
318+
*
319+
* @return version
320+
*/
321+
public List<SourceDestValidation> getAdditionalValidations() {
322+
if ((source.getRuntimeMappings() == null || source.getRuntimeMappings().isEmpty()) == false) {
323+
SourceDestValidation validation =
324+
new SourceDestValidator.RemoteClusterMinimumVersionValidation(
325+
FIELD_CAPS_RUNTIME_MAPPINGS_INTRODUCED_VERSION, "source.runtime_mappings field was set");
326+
return Collections.singletonList(validation);
327+
} else {
328+
return Collections.emptyList();
329+
}
330+
}
331+
311332
public ActionRequestValidationException validate(ActionRequestValidationException validationException) {
312333
if (pivotConfig != null) {
313334
validationException = pivotConfig.validate(validationException);
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,129 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License
4+
* 2.0; you may not use this file except in compliance with the Elastic License
5+
* 2.0.
6+
*/
7+
8+
package org.elasticsearch.xpack.core.common.validation;
9+
10+
import org.elasticsearch.Version;
11+
import org.elasticsearch.action.ActionListener;
12+
import org.elasticsearch.test.ESTestCase;
13+
import org.elasticsearch.transport.NoSuchRemoteClusterException;
14+
import org.elasticsearch.xpack.core.common.validation.SourceDestValidator.Context;
15+
import org.elasticsearch.xpack.core.common.validation.SourceDestValidator.RemoteClusterMinimumVersionValidation;
16+
import org.elasticsearch.xpack.core.common.validation.SourceDestValidator.SourceDestValidation;
17+
import org.junit.Before;
18+
19+
import java.util.Arrays;
20+
import java.util.HashSet;
21+
import java.util.TreeSet;
22+
23+
import static java.util.Collections.emptySet;
24+
import static java.util.Collections.emptySortedSet;
25+
import static org.hamcrest.Matchers.contains;
26+
import static org.hamcrest.Matchers.equalTo;
27+
import static org.hamcrest.Matchers.is;
28+
import static org.hamcrest.Matchers.nullValue;
29+
import static org.mockito.Mockito.doReturn;
30+
import static org.mockito.Mockito.doThrow;
31+
import static org.mockito.Mockito.spy;
32+
33+
public class RemoteClusterMinimumVersionValidationTests extends ESTestCase {
34+
35+
private static final Version MIN_EXPECTED_VERSION = Version.V_7_11_0;
36+
private static final String REASON = "some reason";
37+
38+
private Context context;
39+
40+
@Before
41+
public void setUpMocks() {
42+
context = spy(new Context(null, null, null, null, null, null, null, null, null, null));
43+
doReturn(Version.V_7_10_2).when(context).getRemoteClusterVersion("cluster-A");
44+
doReturn(Version.V_7_11_0).when(context).getRemoteClusterVersion("cluster-B");
45+
doReturn(Version.V_7_11_2).when(context).getRemoteClusterVersion("cluster-C");
46+
}
47+
48+
public void testGetters() {
49+
RemoteClusterMinimumVersionValidation validation = new RemoteClusterMinimumVersionValidation(MIN_EXPECTED_VERSION, REASON);
50+
assertThat(validation.getMinExpectedVersion(), is(equalTo(MIN_EXPECTED_VERSION)));
51+
assertThat(validation.getReason(), is(equalTo(REASON)));
52+
}
53+
54+
public void testValidate_NoRemoteClusters() {
55+
doReturn(emptySet()).when(context).getRegisteredRemoteClusterNames();
56+
doReturn(emptySortedSet()).when(context).resolveRemoteSource();
57+
SourceDestValidation validation = new RemoteClusterMinimumVersionValidation(MIN_EXPECTED_VERSION, REASON);
58+
validation.validate(
59+
context,
60+
ActionListener.wrap(
61+
ctx -> assertThat(ctx.getValidationException(), is(nullValue())),
62+
e -> fail(e.getMessage())));
63+
}
64+
65+
public void testValidate_RemoteClustersVersionsOk() {
66+
doReturn(new HashSet<>(Arrays.asList("cluster-B", "cluster-C"))).when(context).getRegisteredRemoteClusterNames();
67+
doReturn(new TreeSet<>(Arrays.asList("cluster-B:dummy", "cluster-C:dummy"))).when(context).resolveRemoteSource();
68+
SourceDestValidation validation = new RemoteClusterMinimumVersionValidation(MIN_EXPECTED_VERSION, REASON);
69+
validation.validate(
70+
context,
71+
ActionListener.wrap(
72+
ctx -> assertThat(ctx.getValidationException(), is(nullValue())),
73+
e -> fail(e.getMessage())));
74+
}
75+
76+
public void testValidate_OneRemoteClusterVersionTooLow() {
77+
doReturn(new HashSet<>(Arrays.asList("cluster-A", "cluster-B", "cluster-C"))).when(context).getRegisteredRemoteClusterNames();
78+
doReturn(new TreeSet<>(Arrays.asList("cluster-A:dummy", "cluster-B:dummy", "cluster-C:dummy"))).when(context).resolveRemoteSource();
79+
SourceDestValidation validation = new RemoteClusterMinimumVersionValidation(MIN_EXPECTED_VERSION, REASON);
80+
validation.validate(
81+
context,
82+
ActionListener.wrap(
83+
ctx -> assertThat(
84+
ctx.getValidationException().validationErrors(),
85+
contains("remote clusters are expected to run at least version [7.11.0] (reason: [some reason]), "
86+
+ "but the following clusters were too old: [cluster-A (7.10.2)]")),
87+
e -> fail(e.getMessage())));
88+
}
89+
90+
public void testValidate_TwoRemoteClusterVersionsTooLow() {
91+
doReturn(new HashSet<>(Arrays.asList("cluster-A", "cluster-B", "cluster-C"))).when(context).getRegisteredRemoteClusterNames();
92+
doReturn(new TreeSet<>(Arrays.asList("cluster-A:dummy", "cluster-B:dummy", "cluster-C:dummy"))).when(context).resolveRemoteSource();
93+
SourceDestValidation validation = new RemoteClusterMinimumVersionValidation(Version.V_7_11_2, REASON);
94+
validation.validate(
95+
context,
96+
ActionListener.wrap(
97+
ctx -> assertThat(
98+
ctx.getValidationException().validationErrors(),
99+
contains("remote clusters are expected to run at least version [7.11.2] (reason: [some reason]), "
100+
+ "but the following clusters were too old: [cluster-A (7.10.2), cluster-B (7.11.0)]")),
101+
e -> fail(e.getMessage())));
102+
}
103+
104+
public void testValidate_NoSuchRemoteCluster() {
105+
doReturn(new HashSet<>(Arrays.asList("cluster-B", "cluster-C", "cluster-D"))).when(context).getRegisteredRemoteClusterNames();
106+
doReturn(new TreeSet<>(Arrays.asList("cluster-B:dummy", "cluster-C:dummy", "cluster-D:dummy"))).when(context).resolveRemoteSource();
107+
doThrow(new NoSuchRemoteClusterException("cluster-D")).when(context).getRemoteClusterVersion("cluster-D");
108+
SourceDestValidation validation = new RemoteClusterMinimumVersionValidation(MIN_EXPECTED_VERSION, REASON);
109+
validation.validate(
110+
context,
111+
ActionListener.wrap(
112+
ctx -> assertThat(ctx.getValidationException().validationErrors(), contains("no such remote cluster: [cluster-D]")),
113+
e -> fail(e.getMessage())));
114+
}
115+
116+
public void testValidate_OtherProblem() {
117+
doReturn(new HashSet<>(Arrays.asList("cluster-B", "cluster-C"))).when(context).getRegisteredRemoteClusterNames();
118+
doReturn(new TreeSet<>(Arrays.asList("cluster-B:dummy", "cluster-C:dummy"))).when(context).resolveRemoteSource();
119+
doThrow(new IllegalArgumentException("some-other-problem")).when(context).getRemoteClusterVersion("cluster-C");
120+
SourceDestValidation validation = new RemoteClusterMinimumVersionValidation(MIN_EXPECTED_VERSION, REASON);
121+
validation.validate(
122+
context,
123+
ActionListener.wrap(
124+
ctx -> assertThat(
125+
ctx.getValidationException().validationErrors(),
126+
contains("Error resolving remote source: some-other-problem")),
127+
e -> fail(e.getMessage())));
128+
}
129+
}

x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/transform/transforms/TransformConfigTests.java

Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@
1717
import org.elasticsearch.common.xcontent.XContentFactory;
1818
import org.elasticsearch.common.xcontent.XContentParser;
1919
import org.elasticsearch.common.xcontent.XContentType;
20+
import org.elasticsearch.xpack.core.common.validation.SourceDestValidator.RemoteClusterMinimumVersionValidation;
21+
import org.elasticsearch.xpack.core.common.validation.SourceDestValidator.SourceDestValidation;
2022
import org.elasticsearch.xpack.core.transform.AbstractSerializingTransformTestCase;
2123
import org.elasticsearch.xpack.core.transform.transforms.latest.LatestConfig;
2224
import org.elasticsearch.xpack.core.transform.transforms.latest.LatestConfigTests;
@@ -27,13 +29,18 @@
2729
import java.io.IOException;
2830
import java.time.Instant;
2931
import java.util.HashMap;
32+
import java.util.List;
3033
import java.util.Map;
3134

3235
import static org.elasticsearch.test.TestMatchers.matchesPattern;
3336
import static org.elasticsearch.xpack.core.transform.transforms.DestConfigTests.randomDestConfig;
3437
import static org.elasticsearch.xpack.core.transform.transforms.SourceConfigTests.randomInvalidSourceConfig;
3538
import static org.elasticsearch.xpack.core.transform.transforms.SourceConfigTests.randomSourceConfig;
39+
import static org.hamcrest.Matchers.empty;
3640
import static org.hamcrest.Matchers.equalTo;
41+
import static org.hamcrest.Matchers.hasSize;
42+
import static org.hamcrest.Matchers.instanceOf;
43+
import static org.hamcrest.Matchers.is;
3744

3845
public class TransformConfigTests extends AbstractSerializingTransformTestCase<TransformConfig> {
3946

@@ -530,6 +537,57 @@ public void testRewriteForBWCOfDateNormalization() throws IOException {
530537
assertEquals(Version.V_7_11_0, transformConfigRewritten.getVersion());
531538
}
532539

540+
public void testGetAdditionalValidations_WithNoRuntimeMappings() throws IOException {
541+
String transformWithRuntimeMappings = "{"
542+
+ " \"id\" : \"body_id\","
543+
+ " \"source\" : {\"index\":\"src\"},"
544+
+ " \"dest\" : {\"index\": \"dest\"},"
545+
+ " \"pivot\" : {"
546+
+ " \"group_by\": {"
547+
+ " \"id\": {"
548+
+ " \"terms\": {"
549+
+ " \"field\": \"id\""
550+
+ "} } },"
551+
+ " \"aggs\": {"
552+
+ " \"avg\": {"
553+
+ " \"avg\": {"
554+
+ " \"field\": \"points\""
555+
+ "} } } } }";
556+
557+
TransformConfig transformConfig = createTransformConfigFromString(transformWithRuntimeMappings, "body_id", true);
558+
assertThat(transformConfig.getAdditionalValidations(), is(empty()));
559+
}
560+
561+
public void testGetAdditionalValidations_WithRuntimeMappings() throws IOException {
562+
String json = "{"
563+
+ " \"id\" : \"body_id\","
564+
+ " \"source\" : {"
565+
+ " \"index\":\"src\","
566+
+ " \"runtime_mappings\":{ \"some-field\": \"some-value\" }"
567+
+ "},"
568+
+ " \"dest\" : {\"index\": \"dest\"},"
569+
+ " \"pivot\" : {"
570+
+ " \"group_by\": {"
571+
+ " \"id\": {"
572+
+ " \"terms\": {"
573+
+ " \"field\": \"id\""
574+
+ "} } },"
575+
+ " \"aggs\": {"
576+
+ " \"avg\": {"
577+
+ " \"avg\": {"
578+
+ " \"field\": \"points\""
579+
+ "} } } } }";
580+
581+
TransformConfig transformConfig = createTransformConfigFromString(json, "body_id", true);
582+
List<SourceDestValidation> additiionalValidations = transformConfig.getAdditionalValidations();
583+
assertThat(additiionalValidations, hasSize(1));
584+
assertThat(additiionalValidations.get(0), is(instanceOf(RemoteClusterMinimumVersionValidation.class)));
585+
RemoteClusterMinimumVersionValidation remoteClusterMinimumVersionValidation =
586+
(RemoteClusterMinimumVersionValidation) additiionalValidations.get(0);
587+
assertThat(remoteClusterMinimumVersionValidation.getMinExpectedVersion(), is(equalTo(Version.V_7_12_0)));
588+
assertThat(remoteClusterMinimumVersionValidation.getReason(), is(equalTo("source.runtime_mappings field was set")));
589+
}
590+
533591
private TransformConfig createTransformConfigFromString(String json, String id) throws IOException {
534592
return createTransformConfigFromString(json, id, false);
535593
}

x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/action/TransportPreviewTransformAction.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -122,13 +122,12 @@ protected void doExecute(Task task, PreviewTransformAction.Request request, Acti
122122
ClusterState clusterState = clusterService.state();
123123

124124
final TransformConfig config = request.getConfig();
125-
126125
sourceDestValidator.validate(
127126
clusterState,
128127
config.getSource().getIndex(),
129128
config.getDestination().getIndex(),
130129
config.getDestination().getPipeline(),
131-
SourceDestValidations.PREVIEW_VALIDATIONS,
130+
SourceDestValidations.getValidationsForPreview(config.getAdditionalValidations()),
132131
ActionListener.wrap(r -> {
133132
// create the function for validation
134133
final Function function = FunctionFactory.create(config);

x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/action/TransportPutTransformAction.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -211,7 +211,7 @@ protected void masterOperation(Request request, ClusterState clusterState, Actio
211211
config.getSource().getIndex(),
212212
config.getDestination().getIndex(),
213213
config.getDestination().getPipeline(),
214-
request.isDeferValidation() ? SourceDestValidations.NON_DEFERABLE_VALIDATIONS : SourceDestValidations.ALL_VALIDATIONS,
214+
SourceDestValidations.getValidations(request.isDeferValidation(), config.getAdditionalValidations()),
215215
ActionListener.wrap(
216216
validationResponse -> {
217217
// Early check to verify that the user can create the destination index and can read from the source

0 commit comments

Comments
 (0)