Skip to content

[7.12] [Transform] Make sure remote clusters are at least 7.12 if transform uses search runtime fields (#69170) #69511

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Feb 24, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

package org.elasticsearch.xpack.core.common.validation;

import org.elasticsearch.Version;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionRequest;
import org.elasticsearch.action.ActionRequestValidationException;
Expand All @@ -33,11 +34,16 @@
import java.util.Arrays;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Set;
import java.util.SortedSet;
import java.util.TreeSet;
import java.util.stream.Collectors;

import static java.util.Map.Entry.comparingByKey;
import static java.util.function.Function.identity;
import static java.util.stream.Collectors.joining;
import static java.util.stream.Collectors.toMap;
import static org.elasticsearch.action.ValidateActions.addValidationError;
import static org.elasticsearch.cluster.metadata.MetadataCreateIndexService.validateIndexOrAliasName;

Expand All @@ -64,6 +70,8 @@ public final class SourceDestValidator {
public static final String REMOTE_CLUSTER_LICENSE_INACTIVE = "License check failed for remote cluster "
+ "alias [{0}], license is not active";
public static final String REMOTE_SOURCE_INDICES_NOT_SUPPORTED = "remote source indices are not supported";
public static final String REMOTE_CLUSTERS_TOO_OLD =
"remote clusters are expected to run at least version [{0}] (reason: [{1}]), but the following clusters were too old: [{2}]";
public static final String PIPELINE_MISSING = "Pipeline with id [{0}] could not be found";

// workaround for 7.x: remoteClusterAliases does not throw
Expand Down Expand Up @@ -216,6 +224,11 @@ public Set<String> getRegisteredRemoteClusterNames() {
return remoteClusterService.getRegisteredRemoteClusterNames();
}

// convenience method to make testing easier
public Version getRemoteClusterVersion(String cluster) {
return remoteClusterService.getConnection(cluster).getVersion();
}

private void resolveLocalAndRemoteSource() {
resolvedSource = new TreeSet<>(Arrays.asList(source));
resolvedRemoteSource = new TreeSet<>(RemoteClusterLicenseChecker.remoteIndices(resolvedSource));
Expand Down Expand Up @@ -425,6 +438,59 @@ public void validate(Context context, ActionListener<Context> listener) {
}
}

public static class RemoteClusterMinimumVersionValidation implements SourceDestValidation {

private final Version minExpectedVersion;
private final String reason;

public RemoteClusterMinimumVersionValidation(Version minExpectedVersion, String reason) {
this.minExpectedVersion = minExpectedVersion;
this.reason = reason;
}

public Version getMinExpectedVersion() {
return minExpectedVersion;
}

public String getReason() {
return reason;
}

@Override
public void validate(Context context, ActionListener<Context> listener) {
List<String> remoteIndices = new ArrayList<>(context.resolveRemoteSource());
Map<String, Version> remoteClusterVersions;
try {
List<String> remoteAliases =
RemoteClusterLicenseChecker.remoteClusterAliases(context.getRegisteredRemoteClusterNames(), remoteIndices);
remoteClusterVersions = remoteAliases.stream().collect(toMap(identity(), context::getRemoteClusterVersion));
} catch (NoSuchRemoteClusterException e) {
context.addValidationError(e.getMessage());
listener.onResponse(context);
return;
} catch (Exception e) {
context.addValidationError(ERROR_REMOTE_CLUSTER_SEARCH, e.getMessage());
listener.onResponse(context);
return;
}
Map<String, Version> oldRemoteClusterVersions =
remoteClusterVersions.entrySet().stream()
.filter(entry -> entry.getValue().before(minExpectedVersion))
.collect(toMap(Map.Entry::getKey, Map.Entry::getValue));
if (oldRemoteClusterVersions.isEmpty() == false) {
context.addValidationError(
REMOTE_CLUSTERS_TOO_OLD,
minExpectedVersion,
reason,
oldRemoteClusterVersions.entrySet().stream()
.sorted(comparingByKey()) // sort to have a deterministic order among clusters in the resulting string
.map(e -> e.getKey() + " (" + e.getValue() + ")")
.collect(joining(", ")));
}
listener.onResponse(context);
}
}

static class DestinationInSourceValidation implements SourceDestValidation {

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.xpack.core.common.time.TimeUtils;
import org.elasticsearch.xpack.core.common.validation.SourceDestValidator;
import org.elasticsearch.xpack.core.common.validation.SourceDestValidator.SourceDestValidation;
import org.elasticsearch.xpack.core.transform.TransformField;
import org.elasticsearch.xpack.core.transform.TransformMessages;
import org.elasticsearch.xpack.core.transform.transforms.latest.LatestConfig;
Expand All @@ -32,6 +34,7 @@
import java.io.IOException;
import java.time.Instant;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Objects;

Expand All @@ -45,6 +48,8 @@ public class TransformConfig extends AbstractDiffable<TransformConfig> implement

public static final String NAME = "data_frame_transform_config";
public static final ParseField HEADERS = new ParseField("headers");
/** Version in which {@code FieldCapabilitiesRequest.runtime_fields} field was introduced. */
private static final Version FIELD_CAPS_RUNTIME_MAPPINGS_INTRODUCED_VERSION = Version.V_7_12_0;

// types of transforms
public static final ParseField PIVOT_TRANSFORM = new ParseField("pivot");
Expand Down Expand Up @@ -308,6 +313,22 @@ public RetentionPolicyConfig getRetentionPolicyConfig() {
return retentionPolicyConfig;
}

/**
* Determines the minimum version of a cluster in multi-cluster setup that is needed to successfully run this transform config.
*
* @return version
*/
public List<SourceDestValidation> getAdditionalValidations() {
if ((source.getRuntimeMappings() == null || source.getRuntimeMappings().isEmpty()) == false) {
SourceDestValidation validation =
new SourceDestValidator.RemoteClusterMinimumVersionValidation(
FIELD_CAPS_RUNTIME_MAPPINGS_INTRODUCED_VERSION, "source.runtime_mappings field was set");
return Collections.singletonList(validation);
} else {
return Collections.emptyList();
}
}

public ActionRequestValidationException validate(ActionRequestValidationException validationException) {
if (pivotConfig != null) {
validationException = pivotConfig.validate(validationException);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,129 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/

package org.elasticsearch.xpack.core.common.validation;

import org.elasticsearch.Version;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.transport.NoSuchRemoteClusterException;
import org.elasticsearch.xpack.core.common.validation.SourceDestValidator.Context;
import org.elasticsearch.xpack.core.common.validation.SourceDestValidator.RemoteClusterMinimumVersionValidation;
import org.elasticsearch.xpack.core.common.validation.SourceDestValidator.SourceDestValidation;
import org.junit.Before;

import java.util.Arrays;
import java.util.HashSet;
import java.util.TreeSet;

import static java.util.Collections.emptySet;
import static java.util.Collections.emptySortedSet;
import static org.hamcrest.Matchers.contains;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.nullValue;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.spy;

public class RemoteClusterMinimumVersionValidationTests extends ESTestCase {

private static final Version MIN_EXPECTED_VERSION = Version.V_7_11_0;
private static final String REASON = "some reason";

private Context context;

@Before
public void setUpMocks() {
context = spy(new Context(null, null, null, null, null, null, null, null, null, null));
doReturn(Version.V_7_10_2).when(context).getRemoteClusterVersion("cluster-A");
doReturn(Version.V_7_11_0).when(context).getRemoteClusterVersion("cluster-B");
doReturn(Version.V_7_11_2).when(context).getRemoteClusterVersion("cluster-C");
}

public void testGetters() {
RemoteClusterMinimumVersionValidation validation = new RemoteClusterMinimumVersionValidation(MIN_EXPECTED_VERSION, REASON);
assertThat(validation.getMinExpectedVersion(), is(equalTo(MIN_EXPECTED_VERSION)));
assertThat(validation.getReason(), is(equalTo(REASON)));
}

public void testValidate_NoRemoteClusters() {
doReturn(emptySet()).when(context).getRegisteredRemoteClusterNames();
doReturn(emptySortedSet()).when(context).resolveRemoteSource();
SourceDestValidation validation = new RemoteClusterMinimumVersionValidation(MIN_EXPECTED_VERSION, REASON);
validation.validate(
context,
ActionListener.wrap(
ctx -> assertThat(ctx.getValidationException(), is(nullValue())),
e -> fail(e.getMessage())));
}

public void testValidate_RemoteClustersVersionsOk() {
doReturn(new HashSet<>(Arrays.asList("cluster-B", "cluster-C"))).when(context).getRegisteredRemoteClusterNames();
doReturn(new TreeSet<>(Arrays.asList("cluster-B:dummy", "cluster-C:dummy"))).when(context).resolveRemoteSource();
SourceDestValidation validation = new RemoteClusterMinimumVersionValidation(MIN_EXPECTED_VERSION, REASON);
validation.validate(
context,
ActionListener.wrap(
ctx -> assertThat(ctx.getValidationException(), is(nullValue())),
e -> fail(e.getMessage())));
}

public void testValidate_OneRemoteClusterVersionTooLow() {
doReturn(new HashSet<>(Arrays.asList("cluster-A", "cluster-B", "cluster-C"))).when(context).getRegisteredRemoteClusterNames();
doReturn(new TreeSet<>(Arrays.asList("cluster-A:dummy", "cluster-B:dummy", "cluster-C:dummy"))).when(context).resolveRemoteSource();
SourceDestValidation validation = new RemoteClusterMinimumVersionValidation(MIN_EXPECTED_VERSION, REASON);
validation.validate(
context,
ActionListener.wrap(
ctx -> assertThat(
ctx.getValidationException().validationErrors(),
contains("remote clusters are expected to run at least version [7.11.0] (reason: [some reason]), "
+ "but the following clusters were too old: [cluster-A (7.10.2)]")),
e -> fail(e.getMessage())));
}

public void testValidate_TwoRemoteClusterVersionsTooLow() {
doReturn(new HashSet<>(Arrays.asList("cluster-A", "cluster-B", "cluster-C"))).when(context).getRegisteredRemoteClusterNames();
doReturn(new TreeSet<>(Arrays.asList("cluster-A:dummy", "cluster-B:dummy", "cluster-C:dummy"))).when(context).resolveRemoteSource();
SourceDestValidation validation = new RemoteClusterMinimumVersionValidation(Version.V_7_11_2, REASON);
validation.validate(
context,
ActionListener.wrap(
ctx -> assertThat(
ctx.getValidationException().validationErrors(),
contains("remote clusters are expected to run at least version [7.11.2] (reason: [some reason]), "
+ "but the following clusters were too old: [cluster-A (7.10.2), cluster-B (7.11.0)]")),
e -> fail(e.getMessage())));
}

public void testValidate_NoSuchRemoteCluster() {
doReturn(new HashSet<>(Arrays.asList("cluster-B", "cluster-C", "cluster-D"))).when(context).getRegisteredRemoteClusterNames();
doReturn(new TreeSet<>(Arrays.asList("cluster-B:dummy", "cluster-C:dummy", "cluster-D:dummy"))).when(context).resolveRemoteSource();
doThrow(new NoSuchRemoteClusterException("cluster-D")).when(context).getRemoteClusterVersion("cluster-D");
SourceDestValidation validation = new RemoteClusterMinimumVersionValidation(MIN_EXPECTED_VERSION, REASON);
validation.validate(
context,
ActionListener.wrap(
ctx -> assertThat(ctx.getValidationException().validationErrors(), contains("no such remote cluster: [cluster-D]")),
e -> fail(e.getMessage())));
}

public void testValidate_OtherProblem() {
doReturn(new HashSet<>(Arrays.asList("cluster-B", "cluster-C"))).when(context).getRegisteredRemoteClusterNames();
doReturn(new TreeSet<>(Arrays.asList("cluster-B:dummy", "cluster-C:dummy"))).when(context).resolveRemoteSource();
doThrow(new IllegalArgumentException("some-other-problem")).when(context).getRemoteClusterVersion("cluster-C");
SourceDestValidation validation = new RemoteClusterMinimumVersionValidation(MIN_EXPECTED_VERSION, REASON);
validation.validate(
context,
ActionListener.wrap(
ctx -> assertThat(
ctx.getValidationException().validationErrors(),
contains("Error resolving remote source: some-other-problem")),
e -> fail(e.getMessage())));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.xpack.core.common.validation.SourceDestValidator.RemoteClusterMinimumVersionValidation;
import org.elasticsearch.xpack.core.common.validation.SourceDestValidator.SourceDestValidation;
import org.elasticsearch.xpack.core.transform.AbstractSerializingTransformTestCase;
import org.elasticsearch.xpack.core.transform.transforms.latest.LatestConfig;
import org.elasticsearch.xpack.core.transform.transforms.latest.LatestConfigTests;
Expand All @@ -27,13 +29,18 @@
import java.io.IOException;
import java.time.Instant;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import static org.elasticsearch.test.TestMatchers.matchesPattern;
import static org.elasticsearch.xpack.core.transform.transforms.DestConfigTests.randomDestConfig;
import static org.elasticsearch.xpack.core.transform.transforms.SourceConfigTests.randomInvalidSourceConfig;
import static org.elasticsearch.xpack.core.transform.transforms.SourceConfigTests.randomSourceConfig;
import static org.hamcrest.Matchers.empty;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.hasSize;
import static org.hamcrest.Matchers.instanceOf;
import static org.hamcrest.Matchers.is;

public class TransformConfigTests extends AbstractSerializingTransformTestCase<TransformConfig> {

Expand Down Expand Up @@ -530,6 +537,57 @@ public void testRewriteForBWCOfDateNormalization() throws IOException {
assertEquals(Version.V_7_11_0, transformConfigRewritten.getVersion());
}

public void testGetAdditionalValidations_WithNoRuntimeMappings() throws IOException {
String transformWithRuntimeMappings = "{"
+ " \"id\" : \"body_id\","
+ " \"source\" : {\"index\":\"src\"},"
+ " \"dest\" : {\"index\": \"dest\"},"
+ " \"pivot\" : {"
+ " \"group_by\": {"
+ " \"id\": {"
+ " \"terms\": {"
+ " \"field\": \"id\""
+ "} } },"
+ " \"aggs\": {"
+ " \"avg\": {"
+ " \"avg\": {"
+ " \"field\": \"points\""
+ "} } } } }";

TransformConfig transformConfig = createTransformConfigFromString(transformWithRuntimeMappings, "body_id", true);
assertThat(transformConfig.getAdditionalValidations(), is(empty()));
}

public void testGetAdditionalValidations_WithRuntimeMappings() throws IOException {
String json = "{"
+ " \"id\" : \"body_id\","
+ " \"source\" : {"
+ " \"index\":\"src\","
+ " \"runtime_mappings\":{ \"some-field\": \"some-value\" }"
+ "},"
+ " \"dest\" : {\"index\": \"dest\"},"
+ " \"pivot\" : {"
+ " \"group_by\": {"
+ " \"id\": {"
+ " \"terms\": {"
+ " \"field\": \"id\""
+ "} } },"
+ " \"aggs\": {"
+ " \"avg\": {"
+ " \"avg\": {"
+ " \"field\": \"points\""
+ "} } } } }";

TransformConfig transformConfig = createTransformConfigFromString(json, "body_id", true);
List<SourceDestValidation> additiionalValidations = transformConfig.getAdditionalValidations();
assertThat(additiionalValidations, hasSize(1));
assertThat(additiionalValidations.get(0), is(instanceOf(RemoteClusterMinimumVersionValidation.class)));
RemoteClusterMinimumVersionValidation remoteClusterMinimumVersionValidation =
(RemoteClusterMinimumVersionValidation) additiionalValidations.get(0);
assertThat(remoteClusterMinimumVersionValidation.getMinExpectedVersion(), is(equalTo(Version.V_7_12_0)));
assertThat(remoteClusterMinimumVersionValidation.getReason(), is(equalTo("source.runtime_mappings field was set")));
}

private TransformConfig createTransformConfigFromString(String json, String id) throws IOException {
return createTransformConfigFromString(json, id, false);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -122,13 +122,12 @@ protected void doExecute(Task task, PreviewTransformAction.Request request, Acti
ClusterState clusterState = clusterService.state();

final TransformConfig config = request.getConfig();

sourceDestValidator.validate(
clusterState,
config.getSource().getIndex(),
config.getDestination().getIndex(),
config.getDestination().getPipeline(),
SourceDestValidations.PREVIEW_VALIDATIONS,
SourceDestValidations.getValidationsForPreview(config.getAdditionalValidations()),
ActionListener.wrap(r -> {
// create the function for validation
final Function function = FunctionFactory.create(config);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -211,7 +211,7 @@ protected void masterOperation(Request request, ClusterState clusterState, Actio
config.getSource().getIndex(),
config.getDestination().getIndex(),
config.getDestination().getPipeline(),
request.isDeferValidation() ? SourceDestValidations.NON_DEFERABLE_VALIDATIONS : SourceDestValidations.ALL_VALIDATIONS,
SourceDestValidations.getValidations(request.isDeferValidation(), config.getAdditionalValidations()),
ActionListener.wrap(
validationResponse -> {
// Early check to verify that the user can create the destination index and can read from the source
Expand Down
Loading