Skip to content

Commit a328a8e

Browse files
author
Hendrik Muhs
authored
[7.x][Transform] implement node.transform to control where to… (#52998)
implement transform node attributes to disable transform on certain nodes and test which nodes are allowed to do remote connections closes #52200 closes #50033 closes #48734 backport #52712
1 parent db64029 commit a328a8e

File tree

14 files changed

+676
-212
lines changed

14 files changed

+676
-212
lines changed

docs/reference/settings/data-frames-settings.asciidoc renamed to docs/reference/settings/transform-settings.asciidoc

+18-5
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11

22
[role="xpack"]
3-
[[data-frames-settings]]
3+
[[transform-settings]]
44
=== {transforms-cap} settings in Elasticsearch
55
[subs="attributes"]
66
++++
@@ -9,17 +9,30 @@
99

1010
You do not need to configure any settings to use {transforms}. It is enabled by default.
1111

12-
All of these settings can be added to the `elasticsearch.yml` configuration file.
13-
The dynamic settings can also be updated across a cluster with the
12+
All of these settings can be added to the `elasticsearch.yml` configuration file.
13+
The dynamic settings can also be updated across a cluster with the
1414
<<cluster-update-settings,cluster update settings API>>.
1515

16-
TIP: Dynamic settings take precedence over settings in the `elasticsearch.yml`
16+
TIP: Dynamic settings take precedence over settings in the `elasticsearch.yml`
1717
file.
1818

1919
[float]
20-
[[general-data-frames-settings]]
20+
[[general-transform-settings]]
2121
==== General {transforms} settings
2222

23+
`node.transform`::
24+
Set to `true` to identify the node as a _transform node_. The default is `false` if
25+
either `node.data` or `xpack.transform.enabled` is `false` for the node, and `true` otherwise. +
26+
+
27+
If set to `false` in `elasticsearch.yml`, the node cannot run transforms. If set to
28+
`true` but `xpack.transform.enabled` is set to `false`, the `node.transform` setting is
29+
ignored and the node cannot run transforms. If you want to run transforms, there must be at
30+
least one transform node in your cluster. +
31+
+
32+
IMPORTANT: It is advised to use the `node.transform` setting to constrain the execution
33+
of transforms to certain nodes instead of using `xpack.transform.enabled`. On dedicated
34+
coordinating nodes or dedicated master nodes, disable the node.transform role.
35+
2336
`xpack.transform.enabled`::
2437
Set to `true` (default) to enable {transforms} on the node. +
2538
+

docs/reference/setup.asciidoc

+1-1
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,7 @@ include::settings/audit-settings.asciidoc[]
5151

5252
include::settings/ccr-settings.asciidoc[]
5353

54-
include::settings/data-frames-settings.asciidoc[]
54+
include::settings/transform-settings.asciidoc[]
5555

5656
include::settings/ilm-settings.asciidoc[]
5757

x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/transforms/SourceConfig.java

+5
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
import org.elasticsearch.common.xcontent.ToXContentObject;
1616
import org.elasticsearch.common.xcontent.XContentBuilder;
1717
import org.elasticsearch.common.xcontent.XContentParser;
18+
import org.elasticsearch.license.RemoteClusterLicenseChecker;
1819
import org.elasticsearch.xpack.core.transform.utils.ExceptionsHelper;
1920

2021
import java.io.IOException;
@@ -98,6 +99,10 @@ public boolean isValid() {
9899
return queryConfig.isValid();
99100
}
100101

102+
public boolean requiresRemoteCluster() {
103+
return Arrays.stream(index).anyMatch(RemoteClusterLicenseChecker::isRemoteIndex);
104+
}
105+
101106
@Override
102107
public void writeTo(StreamOutput out) throws IOException {
103108
out.writeStringArray(index);

x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/transforms/TransformTaskParams.java

+26-6
Original file line numberDiff line numberDiff line change
@@ -25,29 +25,35 @@ public class TransformTaskParams extends AbstractDiffable<TransformTaskParams> i
2525

2626
public static final String NAME = TransformField.TASK_NAME;
2727
public static final ParseField FREQUENCY = TransformField.FREQUENCY;
28+
public static final ParseField REQUIRES_REMOTE = new ParseField("requires_remote");
2829

2930
private final String transformId;
3031
private final Version version;
3132
private final TimeValue frequency;
33+
private final Boolean requiresRemote;
3234

3335
public static final ConstructingObjectParser<TransformTaskParams, Void> PARSER = new ConstructingObjectParser<>(NAME, true,
34-
a -> new TransformTaskParams((String) a[0], (String) a[1], (String) a[2]));
36+
a -> new TransformTaskParams((String) a[0], (String) a[1], (String) a[2], (Boolean) a[3]));
3537

3638
static {
3739
PARSER.declareString(ConstructingObjectParser.constructorArg(), TransformField.ID);
3840
PARSER.declareString(ConstructingObjectParser.optionalConstructorArg(), TransformField.VERSION);
3941
PARSER.declareString(ConstructingObjectParser.optionalConstructorArg(), FREQUENCY);
42+
PARSER.declareBoolean(ConstructingObjectParser.optionalConstructorArg(), REQUIRES_REMOTE);
4043
}
4144

42-
private TransformTaskParams(String transformId, String version, String frequency) {
45+
private TransformTaskParams(String transformId, String version, String frequency, Boolean remote) {
4346
this(transformId, version == null ? null : Version.fromString(version),
44-
frequency == null ? null : TimeValue.parseTimeValue(frequency, FREQUENCY.getPreferredName()));
47+
frequency == null ? null : TimeValue.parseTimeValue(frequency, FREQUENCY.getPreferredName()),
48+
remote == null ? false : remote.booleanValue()
49+
);
4550
}
4651

47-
public TransformTaskParams(String transformId, Version version, TimeValue frequency) {
52+
public TransformTaskParams(String transformId, Version version, TimeValue frequency, boolean remote) {
4853
this.transformId = transformId;
4954
this.version = version == null ? Version.V_7_2_0 : version;
5055
this.frequency = frequency;
56+
this.requiresRemote = remote;
5157
}
5258

5359
public TransformTaskParams(StreamInput in) throws IOException {
@@ -62,6 +68,11 @@ public TransformTaskParams(StreamInput in) throws IOException {
6268
} else {
6369
this.frequency = null;
6470
}
71+
if (in.getVersion().onOrAfter(Version.V_7_7_0)) {
72+
this.requiresRemote = in.readBoolean();
73+
} else {
74+
this.requiresRemote = false;
75+
}
6576
}
6677

6778
@Override
@@ -83,6 +94,9 @@ public void writeTo(StreamOutput out) throws IOException {
8394
if (out.getVersion().onOrAfter(Version.V_7_3_0)) {
8495
out.writeOptionalTimeValue(frequency);
8596
}
97+
if (out.getVersion().onOrAfter(Version.V_7_7_0)) {
98+
out.writeBoolean(requiresRemote);
99+
}
86100
}
87101

88102
@Override
@@ -93,6 +107,7 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
93107
if (frequency != null) {
94108
builder.field(FREQUENCY.getPreferredName(), frequency.getStringRep());
95109
}
110+
builder.field(REQUIRES_REMOTE.getPreferredName(), requiresRemote);
96111
builder.endObject();
97112
return builder;
98113
}
@@ -109,6 +124,10 @@ public TimeValue getFrequency() {
109124
return frequency;
110125
}
111126

127+
public boolean requiresRemote() {
128+
return requiresRemote;
129+
}
130+
112131
public static TransformTaskParams fromXContent(XContentParser parser) throws IOException {
113132
return PARSER.parse(parser, null);
114133
}
@@ -127,11 +146,12 @@ public boolean equals(Object other) {
127146

128147
return Objects.equals(this.transformId, that.transformId)
129148
&& Objects.equals(this.version, that.version)
130-
&& Objects.equals(this.frequency, that.frequency);
149+
&& Objects.equals(this.frequency, that.frequency)
150+
&& this.requiresRemote == that.requiresRemote;
131151
}
132152

133153
@Override
134154
public int hashCode() {
135-
return Objects.hash(transformId, version, frequency);
155+
return Objects.hash(transformId, version, frequency, requiresRemote);
136156
}
137157
}

x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/transform/transforms/SourceConfigTests.java

+17
Original file line numberDiff line numberDiff line change
@@ -59,4 +59,21 @@ protected Reader<SourceConfig> instanceReader() {
5959
return SourceConfig::new;
6060
}
6161

62+
public void testRequiresRemoteCluster() {
63+
assertFalse(new SourceConfig(new String [] {"index1", "index2", "index3"},
64+
QueryConfigTests.randomQueryConfig()).requiresRemoteCluster());
65+
66+
assertTrue(new SourceConfig(new String [] {"index1", "remote2:index2", "index3"},
67+
QueryConfigTests.randomQueryConfig()).requiresRemoteCluster());
68+
69+
assertTrue(new SourceConfig(new String [] {"index1", "index2", "remote3:index3"},
70+
QueryConfigTests.randomQueryConfig()).requiresRemoteCluster());
71+
72+
assertTrue(new SourceConfig(new String [] {"index1", "remote2:index2", "remote3:index3"},
73+
QueryConfigTests.randomQueryConfig()).requiresRemoteCluster());
74+
75+
assertTrue(new SourceConfig(new String [] {"remote1:index1"},
76+
QueryConfigTests.randomQueryConfig()).requiresRemoteCluster());
77+
}
78+
6279
}

x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/transform/transforms/TransformTests.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ protected TransformTaskParams doParseInstance(XContentParser parser) throws IOEx
2727
@Override
2828
protected TransformTaskParams createTestInstance() {
2929
return new TransformTaskParams(randomAlphaOfLength(10), randomBoolean() ? null : Version.CURRENT,
30-
randomBoolean() ? null : TimeValue.timeValueMillis(randomIntBetween(1_000, 3_600_000)));
30+
randomBoolean() ? null : TimeValue.timeValueMillis(randomIntBetween(1_000, 3_600_000)), randomBoolean());
3131
}
3232

3333
@Override

x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/Transform.java

+61-4
Original file line numberDiff line numberDiff line change
@@ -14,13 +14,15 @@
1414
import org.elasticsearch.client.Client;
1515
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
1616
import org.elasticsearch.cluster.metadata.IndexTemplateMetaData;
17+
import org.elasticsearch.cluster.node.DiscoveryNode;
1718
import org.elasticsearch.cluster.node.DiscoveryNodes;
1819
import org.elasticsearch.cluster.service.ClusterService;
1920
import org.elasticsearch.common.inject.Module;
2021
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
2122
import org.elasticsearch.common.settings.ClusterSettings;
2223
import org.elasticsearch.common.settings.IndexScopedSettings;
2324
import org.elasticsearch.common.settings.Setting;
25+
import org.elasticsearch.common.settings.Setting.Property;
2426
import org.elasticsearch.common.settings.Settings;
2527
import org.elasticsearch.common.settings.SettingsFilter;
2628
import org.elasticsearch.common.settings.SettingsModule;
@@ -40,6 +42,7 @@
4042
import org.elasticsearch.threadpool.ExecutorBuilder;
4143
import org.elasticsearch.threadpool.FixedExecutorBuilder;
4244
import org.elasticsearch.threadpool.ThreadPool;
45+
import org.elasticsearch.transport.RemoteClusterService;
4346
import org.elasticsearch.watcher.ResourceWatcherService;
4447
import org.elasticsearch.xpack.core.XPackPlugin;
4548
import org.elasticsearch.xpack.core.XPackSettings;
@@ -138,6 +141,23 @@ public class Transform extends Plugin implements SystemIndexPlugin, PersistentTa
138141
Setting.Property.Dynamic
139142
);
140143

144+
/**
145+
* Node attributes for transform, automatically created and retrievable via cluster state.
146+
* These attributes should never be set directly, use the node setting counter parts instead.
147+
*/
148+
public static final String TRANSFORM_ENABLED_NODE_ATTR = "transform.node";
149+
public static final String TRANSFORM_REMOTE_ENABLED_NODE_ATTR = "transform.remote_connect";
150+
151+
/**
152+
* Setting whether transform (the coordinator task) can run on this node and REST API's are available,
153+
* respects xpack.transform.enabled (for the whole plugin) as fallback
154+
*/
155+
public static final Setting<Boolean> TRANSFORM_ENABLED_NODE = Setting.boolSetting(
156+
"node.transform",
157+
settings -> Boolean.toString(XPackSettings.TRANSFORM_ENABLED.get(settings) && DiscoveryNode.isDataNode(settings)),
158+
Property.NodeScope
159+
);
160+
141161
public Transform(Settings settings) {
142162
this.settings = settings;
143163
this.enabled = XPackSettings.TRANSFORM_ENABLED.get(settings);
@@ -231,7 +251,13 @@ public List<ExecutorBuilder<?>> getExecutorBuilders(Settings settings) {
231251
return emptyList();
232252
}
233253

234-
FixedExecutorBuilder indexing = new FixedExecutorBuilder(settings, TASK_THREAD_POOL_NAME, 4, 4, "transform.task_thread_pool");
254+
FixedExecutorBuilder indexing = new FixedExecutorBuilder(
255+
settings,
256+
TASK_THREAD_POOL_NAME,
257+
4,
258+
4,
259+
"transform.task_thread_pool"
260+
);
235261

236262
return Collections.singletonList(indexing);
237263
}
@@ -304,13 +330,44 @@ public List<PersistentTasksExecutor<?>> getPersistentTasksExecutor(
304330
// the transform services should have been created
305331
assert transformServices.get() != null;
306332

307-
return Collections.singletonList(new TransformPersistentTasksExecutor(client, transformServices.get(), threadPool, clusterService,
308-
settingsModule.getSettings(), expressionResolver));
333+
return Collections.singletonList(
334+
new TransformPersistentTasksExecutor(
335+
client,
336+
transformServices.get(),
337+
threadPool,
338+
clusterService,
339+
settingsModule.getSettings(),
340+
expressionResolver
341+
)
342+
);
309343
}
310344

311345
@Override
312346
public List<Setting<?>> getSettings() {
313-
return Collections.singletonList(NUM_FAILURE_RETRIES_SETTING);
347+
return Collections.unmodifiableList(Arrays.asList(TRANSFORM_ENABLED_NODE, NUM_FAILURE_RETRIES_SETTING));
348+
}
349+
350+
@Override
351+
public Settings additionalSettings() {
352+
String transformEnabledNodeAttribute = "node.attr." + TRANSFORM_ENABLED_NODE_ATTR;
353+
String transformRemoteEnabledNodeAttribute = "node.attr." + TRANSFORM_REMOTE_ENABLED_NODE_ATTR;
354+
355+
if (settings.get(transformEnabledNodeAttribute) != null || settings.get(transformRemoteEnabledNodeAttribute) != null) {
356+
throw new IllegalArgumentException(
357+
"Directly setting transform node attributes is not permitted, please use the documented node settings instead"
358+
);
359+
}
360+
361+
if (enabled == false) {
362+
return Settings.EMPTY;
363+
}
364+
365+
Settings.Builder additionalSettings = Settings.builder();
366+
367+
additionalSettings.put(transformEnabledNodeAttribute, TRANSFORM_ENABLED_NODE.get(settings));
368+
additionalSettings.put(transformRemoteEnabledNodeAttribute, RemoteClusterService.ENABLE_REMOTE_CLUSTERS.get(settings));
369+
370+
return additionalSettings.build();
314371
}
315372

316373
@Override

x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/action/TransportStartTransformAction.java

+10-3
Original file line numberDiff line numberDiff line change
@@ -263,7 +263,9 @@ protected void masterOperation(
263263
);
264264
return;
265265
}
266-
transformTaskHolder.set(createTransform(config.getId(), config.getVersion(), config.getFrequency()));
266+
transformTaskHolder.set(
267+
createTransform(config.getId(), config.getVersion(), config.getFrequency(), config.getSource().requiresRemoteCluster())
268+
);
267269
transformConfigHolder.set(config);
268270
if (config.getDestination().getPipeline() != null) {
269271
if (ingestService.getPipeline(config.getDestination().getPipeline()) == null) {
@@ -309,8 +311,13 @@ protected ClusterBlockException checkBlock(StartTransformAction.Request request,
309311
return state.blocks().globalBlockedException(ClusterBlockLevel.METADATA_WRITE);
310312
}
311313

312-
private static TransformTaskParams createTransform(String transformId, Version transformVersion, TimeValue frequency) {
313-
return new TransformTaskParams(transformId, transformVersion, frequency);
314+
private static TransformTaskParams createTransform(
315+
String transformId,
316+
Version transformVersion,
317+
TimeValue frequency,
318+
Boolean requiresRemoteCluster
319+
) {
320+
return new TransformTaskParams(transformId, transformVersion, frequency, requiresRemoteCluster);
314321
}
315322

316323
@SuppressWarnings("unchecked")

0 commit comments

Comments
 (0)