Skip to content

Commit 79f8987

Browse files
authored
[7.x] Redirect transform actions to transform&remote_cluster_client node when needed (elastic#70125) (elastic#71853)
1 parent 8b32746 commit 79f8987

File tree

39 files changed

+1185
-331
lines changed

39 files changed

+1185
-331
lines changed

x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/common/validation/SourceDestValidator.java

Lines changed: 15 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@
4646
import static java.util.stream.Collectors.toMap;
4747
import static org.elasticsearch.action.ValidateActions.addValidationError;
4848
import static org.elasticsearch.cluster.metadata.MetadataCreateIndexService.validateIndexOrAliasName;
49+
import static org.elasticsearch.cluster.node.DiscoveryNodeRole.REMOTE_CLUSTER_CLIENT_ROLE;
4950

5051
/**
5152
* Validation of source indexes and destination index.
@@ -61,7 +62,7 @@ public final class SourceDestValidator {
6162
public static final String DEST_LOWERCASE = "Destination index [{0}] must be lowercase";
6263
public static final String NEEDS_REMOTE_CLUSTER_SEARCH = "Source index is configured with a remote index pattern(s) [{0}]"
6364
+ " but the current node [{1}] is not allowed to connect to remote clusters."
64-
+ " Please enable remote.cluster_client for all data nodes.";
65+
+ " Please enable " + REMOTE_CLUSTER_CLIENT_ROLE.roleName() + " for all {2} nodes.";
6566
public static final String ERROR_REMOTE_CLUSTER_SEARCH = "Error resolving remote source: {0}";
6667
public static final String UNKNOWN_REMOTE_CLUSTER_LICENSE = "Error during license check ({0}) for remote cluster "
6768
+ "alias(es) {1}, error: {2}";
@@ -259,7 +260,6 @@ public interface SourceDestValidation {
259260
.strictExpandOpenAndForbidClosedIgnoreThrottled();
260261

261262
public static final SourceDestValidation SOURCE_MISSING_VALIDATION = new SourceMissingValidation();
262-
public static final SourceDestValidation REMOTE_SOURCE_VALIDATION = new RemoteSourceEnabledAndRemoteLicenseValidation();
263263
public static final SourceDestValidation DESTINATION_IN_SOURCE_VALIDATION = new DestinationInSourceValidation();
264264
public static final SourceDestValidation DESTINATION_SINGLE_INDEX_VALIDATION = new DestinationSingleIndexValidation();
265265
public static final SourceDestValidation REMOTE_SOURCE_NOT_SUPPORTED_VALIDATION = new RemoteSourceNotSupportedValidation();
@@ -386,7 +386,14 @@ public void validate(Context context, ActionListener<Context> listener) {
386386
}
387387
}
388388

389-
static class RemoteSourceEnabledAndRemoteLicenseValidation implements SourceDestValidation {
389+
public static class RemoteSourceEnabledAndRemoteLicenseValidation implements SourceDestValidation {
390+
391+
private final String nodeRoleThatRequiresRemoteClusterClient;
392+
393+
public RemoteSourceEnabledAndRemoteLicenseValidation(String nodeRoleThatRequiresRemoteClusterClient) {
394+
this.nodeRoleThatRequiresRemoteClusterClient = nodeRoleThatRequiresRemoteClusterClient;
395+
}
396+
390397
@Override
391398
public void validate(Context context, ActionListener<Context> listener) {
392399
if (context.resolveRemoteSource().isEmpty()) {
@@ -398,7 +405,11 @@ public void validate(Context context, ActionListener<Context> listener) {
398405
// we can only check this node at the moment, clusters with mixed CCS enabled/disabled nodes are not supported,
399406
// see gh#50033
400407
if (context.isRemoteSearchEnabled() == false) {
401-
context.addValidationError(NEEDS_REMOTE_CLUSTER_SEARCH, context.resolveRemoteSource(), context.getNodeName());
408+
context.addValidationError(
409+
NEEDS_REMOTE_CLUSTER_SEARCH,
410+
context.resolveRemoteSource(),
411+
context.getNodeName(),
412+
nodeRoleThatRequiresRemoteClusterClient);
402413
listener.onResponse(context);
403414
return;
404415
}

x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/TransformField.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@ public final class TransformField {
4141
public static final ParseField SYNC = new ParseField("sync");
4242
public static final ParseField TIME = new ParseField("time");
4343
public static final ParseField DELAY = new ParseField("delay");
44+
// TODO: Rename to "defer_data_validation" or similar to emphasize that not all validation is deferred
4445
public static final ParseField DEFER_VALIDATION = new ParseField("defer_validation");
4546
public static final ParseField RETENTION_POLICY = new ParseField("retention_policy");
4647
public static final ParseField MAX_AGE = new ParseField("max_age");
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,132 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License
4+
* 2.0; you may not use this file except in compliance with the Elastic License
5+
* 2.0.
6+
*/
7+
8+
package org.elasticsearch.xpack.core.transform.action;
9+
10+
import org.elasticsearch.action.ActionRequestValidationException;
11+
import org.elasticsearch.action.ActionResponse;
12+
import org.elasticsearch.action.ActionType;
13+
import org.elasticsearch.action.support.master.AcknowledgedRequest;
14+
import org.elasticsearch.common.io.stream.StreamInput;
15+
import org.elasticsearch.common.io.stream.StreamOutput;
16+
import org.elasticsearch.xpack.core.common.validation.SourceDestValidator;
17+
import org.elasticsearch.xpack.core.transform.transforms.TransformConfig;
18+
19+
import java.io.IOException;
20+
import java.util.Map;
21+
import java.util.Objects;
22+
23+
public class ValidateTransformAction extends ActionType<ValidateTransformAction.Response> {
24+
25+
public static final ValidateTransformAction INSTANCE = new ValidateTransformAction();
26+
public static final String NAME = "cluster:admin/transform/validate";
27+
28+
private ValidateTransformAction() {
29+
super(NAME, ValidateTransformAction.Response::new);
30+
}
31+
32+
public static class Request extends AcknowledgedRequest<Request> {
33+
34+
private final TransformConfig config;
35+
private final boolean deferValidation;
36+
37+
public Request(TransformConfig config, boolean deferValidation) {
38+
this.config = config;
39+
this.deferValidation = deferValidation;
40+
}
41+
42+
public Request(StreamInput in) throws IOException {
43+
super(in);
44+
this.config = new TransformConfig(in);
45+
this.deferValidation = in.readBoolean();
46+
}
47+
48+
@Override
49+
public ActionRequestValidationException validate() {
50+
ActionRequestValidationException validationException = null;
51+
52+
validationException = config.validate(validationException);
53+
validationException = SourceDestValidator.validateRequest(
54+
validationException,
55+
config.getDestination() != null ? config.getDestination().getIndex() : null
56+
);
57+
58+
return validationException;
59+
}
60+
61+
public TransformConfig getConfig() {
62+
return config;
63+
}
64+
65+
public boolean isDeferValidation() {
66+
return deferValidation;
67+
}
68+
69+
@Override
70+
public void writeTo(StreamOutput out) throws IOException {
71+
super.writeTo(out);
72+
this.config.writeTo(out);
73+
out.writeBoolean(this.deferValidation);
74+
}
75+
76+
@Override
77+
public boolean equals(Object obj) {
78+
if (obj == this) {
79+
return true;
80+
}
81+
if (obj == null || getClass() != obj.getClass()) {
82+
return false;
83+
}
84+
Request that = (Request) obj;
85+
return Objects.equals(config, that.config)
86+
&& deferValidation == that.deferValidation;
87+
}
88+
89+
@Override
90+
public int hashCode() {
91+
return Objects.hash(config, deferValidation);
92+
}
93+
}
94+
95+
public static class Response extends ActionResponse {
96+
97+
private final Map<String, String> destIndexMappings;
98+
99+
public Response(Map<String, String> destIndexMappings) {
100+
this.destIndexMappings = destIndexMappings;
101+
}
102+
103+
public Response(StreamInput in) throws IOException {
104+
this.destIndexMappings = in.readMap(StreamInput::readString, StreamInput::readString);
105+
}
106+
107+
public void writeTo(StreamOutput out) throws IOException {
108+
out.writeMap(destIndexMappings, StreamOutput::writeString, StreamOutput::writeString);
109+
}
110+
111+
public Map<String, String> getDestIndexMappings() {
112+
return destIndexMappings;
113+
}
114+
115+
@Override
116+
public boolean equals(Object obj) {
117+
if (obj == this) {
118+
return true;
119+
}
120+
if (obj == null || obj.getClass() != getClass()) {
121+
return false;
122+
}
123+
Response that = (Response) obj;
124+
return Objects.equals(this.destIndexMappings, that.destIndexMappings);
125+
}
126+
127+
@Override
128+
public int hashCode() {
129+
return Objects.hash(destIndexMappings);
130+
}
131+
}
132+
}

x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/transforms/TransformDestIndexSettings.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,6 @@
1515
import org.elasticsearch.common.io.stream.Writeable;
1616
import org.elasticsearch.common.settings.Settings;
1717
import org.elasticsearch.common.xcontent.ConstructingObjectParser;
18-
import org.elasticsearch.common.xcontent.ToXContent.Params;
1918
import org.elasticsearch.common.xcontent.ToXContentObject;
2019
import org.elasticsearch.common.xcontent.XContentBuilder;
2120
import org.elasticsearch.common.xcontent.XContentParser;

x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/transforms/TransformTaskParams.java

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -44,10 +44,12 @@ public class TransformTaskParams extends AbstractDiffable<TransformTaskParams> i
4444
}
4545

4646
private TransformTaskParams(String transformId, String version, String frequency, Boolean remote) {
47-
this(transformId, version == null ? null : Version.fromString(version),
47+
this(
48+
transformId,
49+
version == null ? null : Version.fromString(version),
4850
frequency == null ? null : TimeValue.parseTimeValue(frequency, FREQUENCY.getPreferredName()),
49-
remote == null ? false : remote.booleanValue()
50-
);
51+
remote == null ? false : remote.booleanValue()
52+
);
5153
}
5254

5355
public TransformTaskParams(String transformId, Version version, TimeValue frequency, boolean remote) {

x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/common/validation/SourceDestValidatorTests.java

Lines changed: 9 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,6 @@
6666
import static org.elasticsearch.xpack.core.common.validation.SourceDestValidator.DESTINATION_IN_SOURCE_VALIDATION;
6767
import static org.elasticsearch.xpack.core.common.validation.SourceDestValidator.DESTINATION_PIPELINE_MISSING_VALIDATION;
6868
import static org.elasticsearch.xpack.core.common.validation.SourceDestValidator.DESTINATION_SINGLE_INDEX_VALIDATION;
69-
import static org.elasticsearch.xpack.core.common.validation.SourceDestValidator.REMOTE_SOURCE_VALIDATION;
7069
import static org.elasticsearch.xpack.core.common.validation.SourceDestValidator.SOURCE_MISSING_VALIDATION;
7170
import static org.hamcrest.Matchers.equalTo;
7271
import static org.mockito.Mockito.mock;
@@ -85,6 +84,9 @@ public class SourceDestValidatorTests extends ESTestCase {
8584

8685
private static final ClusterState CLUSTER_STATE;
8786

87+
private static final String DUMMY_NODE_ROLE = "dummy";
88+
private static final SourceDestValidator.SourceDestValidation REMOTE_SOURCE_VALIDATION =
89+
new RemoteSourceEnabledAndRemoteLicenseValidation(DUMMY_NODE_ROLE);
8890
private static final List<SourceDestValidator.SourceDestValidation> TEST_VALIDATIONS = Arrays.asList(
8991
SOURCE_MISSING_VALIDATION,
9092
DESTINATION_IN_SOURCE_VALIDATION,
@@ -670,7 +672,7 @@ public void testRemoteSourceBasic() throws InterruptedException {
670672
);
671673

672674
when(context.getRegisteredRemoteClusterNames()).thenReturn(Collections.singleton(REMOTE_BASIC));
673-
RemoteSourceEnabledAndRemoteLicenseValidation validator = new RemoteSourceEnabledAndRemoteLicenseValidation();
675+
RemoteSourceEnabledAndRemoteLicenseValidation validator = new RemoteSourceEnabledAndRemoteLicenseValidation(DUMMY_NODE_ROLE);
674676

675677
assertValidationWithContext(
676678
listener -> validator.validate(context, listener),
@@ -697,7 +699,7 @@ public void testRemoteSourcePlatinum() throws InterruptedException {
697699
);
698700

699701
when(context.getRegisteredRemoteClusterNames()).thenReturn(Collections.singleton(REMOTE_BASIC));
700-
final RemoteSourceEnabledAndRemoteLicenseValidation validator = new RemoteSourceEnabledAndRemoteLicenseValidation();
702+
final RemoteSourceEnabledAndRemoteLicenseValidation validator = new RemoteSourceEnabledAndRemoteLicenseValidation(DUMMY_NODE_ROLE);
701703

702704
assertValidationWithContext(listener -> validator.validate(context, listener), c -> {
703705
assertNotNull(c.getValidationException());
@@ -752,7 +754,7 @@ public void testRemoteSourcePlatinum() throws InterruptedException {
752754
);
753755
when(context3.getRegisteredRemoteClusterNames()).thenReturn(Collections.singleton(REMOTE_PLATINUM));
754756

755-
final RemoteSourceEnabledAndRemoteLicenseValidation validator3 = new RemoteSourceEnabledAndRemoteLicenseValidation();
757+
final RemoteSourceEnabledAndRemoteLicenseValidation validator3 = new RemoteSourceEnabledAndRemoteLicenseValidation(DUMMY_NODE_ROLE);
756758
assertValidationWithContext(
757759
listener -> validator3.validate(context3, listener),
758760
c -> { assertNull(c.getValidationException()); },
@@ -776,7 +778,7 @@ public void testRemoteSourcePlatinum() throws InterruptedException {
776778
);
777779
when(context4.getRegisteredRemoteClusterNames()).thenReturn(Collections.singleton(REMOTE_PLATINUM));
778780

779-
final RemoteSourceEnabledAndRemoteLicenseValidation validator4 = new RemoteSourceEnabledAndRemoteLicenseValidation();
781+
final RemoteSourceEnabledAndRemoteLicenseValidation validator4 = new RemoteSourceEnabledAndRemoteLicenseValidation(DUMMY_NODE_ROLE);
780782
assertValidationWithContext(
781783
listener -> validator4.validate(context4, listener),
782784
c -> { assertNull(c.getValidationException()); },
@@ -802,7 +804,7 @@ public void testRemoteSourceLicenseInActive() throws InterruptedException {
802804
);
803805

804806
when(context.getRegisteredRemoteClusterNames()).thenReturn(Collections.singleton(REMOTE_BASIC));
805-
final RemoteSourceEnabledAndRemoteLicenseValidation validator = new RemoteSourceEnabledAndRemoteLicenseValidation();
807+
final RemoteSourceEnabledAndRemoteLicenseValidation validator = new RemoteSourceEnabledAndRemoteLicenseValidation(DUMMY_NODE_ROLE);
806808
assertValidationWithContext(listener -> validator.validate(context, listener), c -> {
807809
assertNotNull(c.getValidationException());
808810
assertEquals(1, c.getValidationException().validationErrors().size());
@@ -831,7 +833,7 @@ public void testRemoteSourceDoesNotExist() throws InterruptedException {
831833
);
832834

833835
when(context.getRegisteredRemoteClusterNames()).thenReturn(Collections.singleton(REMOTE_BASIC));
834-
RemoteSourceEnabledAndRemoteLicenseValidation validator = new RemoteSourceEnabledAndRemoteLicenseValidation();
836+
RemoteSourceEnabledAndRemoteLicenseValidation validator = new RemoteSourceEnabledAndRemoteLicenseValidation(DUMMY_NODE_ROLE);
835837

836838
assertValidationWithContext(listener -> validator.validate(context, listener), c -> {
837839
assertNotNull(c.getValidationException());
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License
4+
* 2.0; you may not use this file except in compliance with the Elastic License
5+
* 2.0.
6+
*/
7+
8+
package org.elasticsearch.xpack.core.transform.action;
9+
10+
import org.elasticsearch.common.io.stream.Writeable.Reader;
11+
import org.elasticsearch.xpack.core.transform.action.ValidateTransformAction.Request;
12+
13+
import static org.elasticsearch.xpack.core.transform.transforms.TransformConfigTests.randomTransformConfig;
14+
15+
public class ValidateTransformActionRequestTests extends AbstractWireSerializingTransformTestCase<Request> {
16+
17+
@Override
18+
protected Request createTestInstance() {
19+
return new Request(randomTransformConfig(), randomBoolean());
20+
}
21+
22+
@Override
23+
protected Reader<Request> instanceReader() {
24+
return Request::new;
25+
}
26+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License
4+
* 2.0; you may not use this file except in compliance with the Elastic License
5+
* 2.0.
6+
*/
7+
8+
package org.elasticsearch.xpack.core.transform.action;
9+
10+
import org.elasticsearch.common.io.stream.Writeable.Reader;
11+
import org.elasticsearch.xpack.core.transform.action.ValidateTransformAction.Response;
12+
13+
import java.util.Map;
14+
import java.util.function.Function;
15+
16+
import static java.util.stream.Collectors.toMap;
17+
18+
public class ValidateTransformActionResponseTests extends AbstractWireSerializingTransformTestCase<Response> {
19+
20+
@Override
21+
protected Response createTestInstance() {
22+
return new Response(randomDestIndexMappings());
23+
}
24+
25+
private Map<String, String> randomDestIndexMappings() {
26+
return randomList(1, 20, () -> randomAlphaOfLengthBetween(1, 20)).stream()
27+
.distinct()
28+
.collect(toMap(Function.identity(), fieldName -> randomAlphaOfLengthBetween(1, 20)));
29+
}
30+
31+
@Override
32+
protected Reader<Response> instanceReader() {
33+
return Response::new;
34+
}
35+
}

x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/transform/transforms/TransformDestIndexSettingsTests.java

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -7,19 +7,21 @@
77

88
package org.elasticsearch.xpack.core.transform.transforms;
99

10+
import org.elasticsearch.Version;
1011
import org.elasticsearch.action.admin.indices.alias.Alias;
1112
import org.elasticsearch.common.io.stream.Writeable.Reader;
1213
import org.elasticsearch.common.settings.Settings;
1314
import org.elasticsearch.common.xcontent.XContentParser;
1415
import org.elasticsearch.xpack.core.transform.AbstractSerializingTransformTestCase;
1516

1617
import java.io.IOException;
17-
import java.util.Collections;
1818
import java.util.HashMap;
1919
import java.util.HashSet;
2020
import java.util.Map;
2121
import java.util.Set;
2222

23+
import static java.util.Collections.singletonMap;
24+
2325
public class TransformDestIndexSettingsTests extends AbstractSerializingTransformTestCase<TransformDestIndexSettings> {
2426

2527
public static TransformDestIndexSettings randomDestIndexSettings() {
@@ -29,9 +31,9 @@ public static TransformDestIndexSettings randomDestIndexSettings() {
2931

3032
if (randomBoolean()) {
3133
mappings = new HashMap<>(size);
32-
34+
mappings.put("_meta", singletonMap("_transform", singletonMap("version", Version.CURRENT.toString())));
3335
for (int i = 0; i < size; i++) {
34-
mappings.put(randomAlphaOfLength(10), Collections.singletonMap("type", randomAlphaOfLength(10)));
36+
mappings.put(randomAlphaOfLength(10), singletonMap("type", randomAlphaOfLength(10)));
3537
}
3638
}
3739

x-pack/plugin/security/qa/operator-privileges-tests/src/javaRestTest/java/org/elasticsearch/xpack/security/operator/Constants.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -89,6 +89,7 @@ public class Constants {
8989
"cluster:admin/transform/start",
9090
"cluster:admin/transform/stop",
9191
"cluster:admin/transform/update",
92+
"cluster:admin/transform/validate",
9293
// "cluster:admin/voting_config/add_exclusions",
9394
// "cluster:admin/voting_config/clear_exclusions",
9495
"cluster:admin/xpack/ccr/auto_follow_pattern/activate",

0 commit comments

Comments
 (0)