Skip to content

Commit b95ee7e

Browse files
authored
[7.x] [ML][Data Frame] using transform creation version for node assignment (#43764) (#43843)
* [ML][Data Frame] using transform creation version for node assignment (#43764) * [ML][Data Frame] using transform creation version for node assignment * removing unused imports * Addressing PR comment * adjusing for backport
1 parent 82c1ddc commit b95ee7e

File tree

6 files changed

+181
-11
lines changed

6 files changed

+181
-11
lines changed

x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/dataframe/transforms/DataFrameTransform.java

Lines changed: 26 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88

99
import org.elasticsearch.Version;
1010
import org.elasticsearch.cluster.AbstractDiffable;
11+
import org.elasticsearch.common.ParseField;
1112
import org.elasticsearch.common.io.stream.StreamInput;
1213
import org.elasticsearch.common.io.stream.StreamOutput;
1314
import org.elasticsearch.common.xcontent.ConstructingObjectParser;
@@ -22,22 +23,35 @@
2223
public class DataFrameTransform extends AbstractDiffable<DataFrameTransform> implements XPackPlugin.XPackPersistentTaskParams {
2324

2425
public static final String NAME = DataFrameField.TASK_NAME;
26+
public static final ParseField VERSION = new ParseField(DataFrameField.VERSION);
2527

2628
private final String transformId;
29+
private final Version version;
2730

2831
public static final ConstructingObjectParser<DataFrameTransform, Void> PARSER = new ConstructingObjectParser<>(NAME,
29-
a -> new DataFrameTransform((String) a[0]));
32+
a -> new DataFrameTransform((String) a[0], (String) a[1]));
3033

3134
static {
3235
PARSER.declareString(ConstructingObjectParser.constructorArg(), DataFrameField.ID);
36+
PARSER.declareString(ConstructingObjectParser.optionalConstructorArg(), VERSION);
3337
}
3438

35-
public DataFrameTransform(String transformId) {
39+
private DataFrameTransform(String transformId, String version) {
40+
this(transformId, version == null ? null : Version.fromString(version));
41+
}
42+
43+
public DataFrameTransform(String transformId, Version version) {
3644
this.transformId = transformId;
45+
this.version = version == null ? Version.V_7_2_0 : version;
3746
}
3847

3948
public DataFrameTransform(StreamInput in) throws IOException {
4049
this.transformId = in.readString();
50+
if (in.getVersion().onOrAfter(Version.V_7_3_0)) {
51+
this.version = Version.readVersion(in);
52+
} else {
53+
this.version = Version.V_7_2_0;
54+
}
4155
}
4256

4357
@Override
@@ -53,12 +67,16 @@ public Version getMinimalSupportedVersion() {
5367
@Override
5468
public void writeTo(StreamOutput out) throws IOException {
5569
out.writeString(transformId);
70+
if (out.getVersion().onOrAfter(Version.V_7_3_0)) {
71+
Version.writeVersion(version, out);
72+
}
5673
}
5774

5875
@Override
5976
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
6077
builder.startObject();
6178
builder.field(DataFrameField.ID.getPreferredName(), transformId);
79+
builder.field(VERSION.getPreferredName(), version);
6280
builder.endObject();
6381
return builder;
6482
}
@@ -67,6 +85,10 @@ public String getId() {
6785
return transformId;
6886
}
6987

88+
public Version getVersion() {
89+
return version;
90+
}
91+
7092
public static DataFrameTransform fromXContent(XContentParser parser) throws IOException {
7193
return PARSER.parse(parser, null);
7294
}
@@ -83,11 +105,11 @@ public boolean equals(Object other) {
83105

84106
DataFrameTransform that = (DataFrameTransform) other;
85107

86-
return Objects.equals(this.transformId, that.transformId);
108+
return Objects.equals(this.transformId, that.transformId) && Objects.equals(this.version, that.version);
87109
}
88110

89111
@Override
90112
public int hashCode() {
91-
return Objects.hash(transformId);
113+
return Objects.hash(transformId, version);
92114
}
93115
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,52 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License;
4+
* you may not use this file except in compliance with the Elastic License.
5+
*/
6+
7+
package org.elasticsearch.xpack.core.dataframe.transforms;
8+
9+
import org.elasticsearch.Version;
10+
import org.elasticsearch.common.io.stream.BytesStreamOutput;
11+
import org.elasticsearch.common.io.stream.StreamInput;
12+
import org.elasticsearch.common.io.stream.Writeable.Reader;
13+
import org.elasticsearch.common.xcontent.XContentParser;
14+
15+
import java.io.IOException;
16+
17+
import static org.hamcrest.Matchers.equalTo;
18+
19+
public class DataFrameTransformTests extends AbstractSerializingDataFrameTestCase<DataFrameTransform> {
20+
21+
@Override
22+
protected DataFrameTransform doParseInstance(XContentParser parser) throws IOException {
23+
return DataFrameTransform.PARSER.apply(parser, null);
24+
}
25+
26+
@Override
27+
protected DataFrameTransform createTestInstance() {
28+
return new DataFrameTransform(randomAlphaOfLength(10), randomBoolean() ? null : Version.CURRENT);
29+
}
30+
31+
@Override
32+
protected Reader<DataFrameTransform> instanceReader() {
33+
return DataFrameTransform::new;
34+
}
35+
36+
public void testBackwardsSerialization() throws IOException {
37+
for (int i = 0; i < NUMBER_OF_TEST_RUNS; i++) {
38+
DataFrameTransform transformTask = createTestInstance();
39+
try (BytesStreamOutput output = new BytesStreamOutput()) {
40+
output.setVersion(Version.V_7_2_0);
41+
transformTask.writeTo(output);
42+
try (StreamInput in = output.bytes().streamInput()) {
43+
in.setVersion(Version.V_7_2_0);
44+
// Since the old version does not have the version serialized, the version NOW is 7.2.0
45+
DataFrameTransform streamedTask = new DataFrameTransform(in);
46+
assertThat(streamedTask.getVersion(), equalTo(Version.V_7_2_0));
47+
assertThat(streamedTask.getId(), equalTo(transformTask.getId()));
48+
}
49+
}
50+
}
51+
}
52+
}

x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/action/TransportStartDataFrameTransformAction.java

Lines changed: 12 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
import org.apache.logging.log4j.Logger;
1111
import org.elasticsearch.ElasticsearchException;
1212
import org.elasticsearch.ElasticsearchStatusException;
13+
import org.elasticsearch.Version;
1314
import org.elasticsearch.action.ActionListener;
1415
import org.elasticsearch.action.admin.indices.stats.IndicesStatsResponse;
1516
import org.elasticsearch.action.support.ActionFilters;
@@ -49,6 +50,7 @@
4950
import java.time.Clock;
5051
import java.util.Collection;
5152
import java.util.Map;
53+
import java.util.concurrent.atomic.AtomicReference;
5254
import java.util.function.Consumer;
5355
import java.util.function.Predicate;
5456

@@ -101,12 +103,14 @@ protected void masterOperation(StartDataFrameTransformAction.Request request,
101103
listener.onFailure(LicenseUtils.newComplianceException(XPackField.DATA_FRAME));
102104
return;
103105
}
104-
final DataFrameTransform transformTask = createDataFrameTransform(request.getId(), threadPool);
106+
final AtomicReference<DataFrameTransform> transformTaskHolder = new AtomicReference<>();
105107

106-
// <3> Wait for the allocated task's state to STARTED
108+
// <4> Wait for the allocated task's state to STARTED
107109
ActionListener<PersistentTasksCustomMetaData.PersistentTask<DataFrameTransform>> newPersistentTaskActionListener =
108110
ActionListener.wrap(
109111
task -> {
112+
DataFrameTransform transformTask = transformTaskHolder.get();
113+
assert transformTask != null;
110114
waitForDataFrameTaskStarted(task.getId(),
111115
transformTask,
112116
request.timeout(),
@@ -120,6 +124,8 @@ protected void masterOperation(StartDataFrameTransformAction.Request request,
120124
// <3> Create the task in cluster state so that it will start executing on the node
121125
ActionListener<Void> createOrGetIndexListener = ActionListener.wrap(
122126
unused -> {
127+
DataFrameTransform transformTask = transformTaskHolder.get();
128+
assert transformTask != null;
123129
PersistentTasksCustomMetaData.PersistentTask<DataFrameTransform> existingTask =
124130
getExistingTask(transformTask.getId(), state);
125131
if (existingTask == null) {
@@ -178,6 +184,8 @@ protected void masterOperation(StartDataFrameTransformAction.Request request,
178184
));
179185
return;
180186
}
187+
188+
transformTaskHolder.set(createDataFrameTransform(config.getId(), config.getVersion()));
181189
final String destinationIndex = config.getDestination().getIndex();
182190
String[] dest = indexNameExpressionResolver.concreteIndexNames(state,
183191
IndicesOptions.lenientExpandOpen(),
@@ -246,8 +254,8 @@ protected ClusterBlockException checkBlock(StartDataFrameTransformAction.Request
246254
return state.blocks().globalBlockedException(ClusterBlockLevel.METADATA_WRITE);
247255
}
248256

249-
private static DataFrameTransform createDataFrameTransform(String transformId, ThreadPool threadPool) {
250-
return new DataFrameTransform(transformId);
257+
private static DataFrameTransform createDataFrameTransform(String transformId, Version transformVersion) {
258+
return new DataFrameTransform(transformId, transformVersion);
251259
}
252260

253261
@SuppressWarnings("unchecked")

x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/transforms/DataFrameTransformPersistentTasksExecutor.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
import org.elasticsearch.client.Client;
1616
import org.elasticsearch.cluster.ClusterState;
1717
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
18+
import org.elasticsearch.cluster.node.DiscoveryNode;
1819
import org.elasticsearch.cluster.routing.IndexRoutingTable;
1920
import org.elasticsearch.common.Nullable;
2021
import org.elasticsearch.persistent.AllocatedPersistentTask;
@@ -84,7 +85,10 @@ public PersistentTasksCustomMetaData.Assignment getAssignment(DataFrameTransform
8485
logger.debug(reason);
8586
return new PersistentTasksCustomMetaData.Assignment(null, reason);
8687
}
87-
return super.getAssignment(params, clusterState);
88+
DiscoveryNode discoveryNode = selectLeastLoadedNode(clusterState, (node) ->
89+
node.isDataNode() && node.getVersion().onOrAfter(params.getVersion())
90+
);
91+
return discoveryNode == null ? NO_NODE_FOUND : new PersistentTasksCustomMetaData.Assignment(discoveryNode.getId(), "");
8892
}
8993

9094
static List<String> verifyIndicesPrimaryShardsAreActive(ClusterState clusterState) {

x-pack/plugin/data-frame/src/test/java/org/elasticsearch/xpack/dataframe/action/DataFrameNodesTests.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -31,10 +31,10 @@ public void testDataframeNodes() {
3131

3232
PersistentTasksCustomMetaData.Builder tasksBuilder = PersistentTasksCustomMetaData.builder();
3333
tasksBuilder.addTask(dataFrameIdFoo,
34-
DataFrameField.TASK_NAME, new DataFrameTransform(dataFrameIdFoo),
34+
DataFrameField.TASK_NAME, new DataFrameTransform(dataFrameIdFoo, Version.CURRENT),
3535
new PersistentTasksCustomMetaData.Assignment("node-1", "test assignment"));
3636
tasksBuilder.addTask(dataFrameIdBar,
37-
DataFrameField.TASK_NAME, new DataFrameTransform(dataFrameIdBar),
37+
DataFrameField.TASK_NAME, new DataFrameTransform(dataFrameIdBar, Version.CURRENT),
3838
new PersistentTasksCustomMetaData.Assignment("node-2", "test assignment"));
3939
tasksBuilder.addTask("test-task1", "testTasks", new PersistentTaskParams() {
4040
@Override

x-pack/plugin/data-frame/src/test/java/org/elasticsearch/xpack/dataframe/transforms/DataFrameTransformPersistentTasksExecutorTests.java

Lines changed: 84 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,10 +7,14 @@
77
package org.elasticsearch.xpack.dataframe.transforms;
88

99
import org.elasticsearch.Version;
10+
import org.elasticsearch.client.Client;
1011
import org.elasticsearch.cluster.ClusterName;
1112
import org.elasticsearch.cluster.ClusterState;
1213
import org.elasticsearch.cluster.metadata.IndexMetaData;
1314
import org.elasticsearch.cluster.metadata.MetaData;
15+
import org.elasticsearch.cluster.node.DiscoveryNode;
16+
import org.elasticsearch.cluster.node.DiscoveryNodeRole;
17+
import org.elasticsearch.cluster.node.DiscoveryNodes;
1418
import org.elasticsearch.cluster.routing.IndexRoutingTable;
1519
import org.elasticsearch.cluster.routing.IndexShardRoutingTable;
1620
import org.elasticsearch.cluster.routing.RecoverySource;
@@ -20,14 +24,94 @@
2024
import org.elasticsearch.common.settings.Settings;
2125
import org.elasticsearch.index.Index;
2226
import org.elasticsearch.index.shard.ShardId;
27+
import org.elasticsearch.persistent.PersistentTasksCustomMetaData;
2328
import org.elasticsearch.test.ESTestCase;
29+
import org.elasticsearch.threadpool.ThreadPool;
30+
import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransform;
31+
import org.elasticsearch.xpack.core.scheduler.SchedulerEngine;
32+
import org.elasticsearch.xpack.dataframe.checkpoint.DataFrameTransformsCheckpointService;
33+
import org.elasticsearch.xpack.dataframe.notifications.DataFrameAuditor;
2434
import org.elasticsearch.xpack.dataframe.persistence.DataFrameInternalIndex;
35+
import org.elasticsearch.xpack.dataframe.persistence.DataFrameTransformsConfigManager;
2536

2637
import java.util.ArrayList;
38+
import java.util.Arrays;
39+
import java.util.Collections;
40+
import java.util.HashSet;
2741
import java.util.List;
2842

43+
import static org.hamcrest.Matchers.equalTo;
44+
import static org.mockito.Mockito.mock;
45+
2946
public class DataFrameTransformPersistentTasksExecutorTests extends ESTestCase {
3047

48+
public void testNodeVersionAssignment() {
49+
MetaData.Builder metaData = MetaData.builder();
50+
RoutingTable.Builder routingTable = RoutingTable.builder();
51+
addIndices(metaData, routingTable);
52+
PersistentTasksCustomMetaData.Builder pTasksBuilder = PersistentTasksCustomMetaData.builder()
53+
.addTask("data-frame-task-1",
54+
DataFrameTransform.NAME,
55+
new DataFrameTransform("data-frame-task-1", Version.CURRENT),
56+
new PersistentTasksCustomMetaData.Assignment("current-data-node-with-1-tasks", ""))
57+
.addTask("data-frame-task-2",
58+
DataFrameTransform.NAME,
59+
new DataFrameTransform("data-frame-task-2", Version.CURRENT),
60+
new PersistentTasksCustomMetaData.Assignment("current-data-node-with-2-tasks", ""))
61+
.addTask("data-frame-task-3",
62+
DataFrameTransform.NAME,
63+
new DataFrameTransform("data-frame-task-3", Version.CURRENT),
64+
new PersistentTasksCustomMetaData.Assignment("current-data-node-with-2-tasks", ""));
65+
66+
PersistentTasksCustomMetaData pTasks = pTasksBuilder.build();
67+
68+
metaData.putCustom(PersistentTasksCustomMetaData.TYPE, pTasks);
69+
70+
DiscoveryNodes.Builder nodes = DiscoveryNodes.builder()
71+
.add(new DiscoveryNode("past-data-node-1",
72+
buildNewFakeTransportAddress(),
73+
Collections.emptyMap(),
74+
new HashSet<>(Arrays.asList(DiscoveryNodeRole.DATA_ROLE, DiscoveryNodeRole.MASTER_ROLE)),
75+
Version.V_7_2_0))
76+
.add(new DiscoveryNode("current-data-node-with-2-tasks",
77+
buildNewFakeTransportAddress(),
78+
Collections.emptyMap(),
79+
new HashSet<>(Arrays.asList(DiscoveryNodeRole.DATA_ROLE, DiscoveryNodeRole.MASTER_ROLE)),
80+
Version.CURRENT))
81+
.add(new DiscoveryNode("non-data-node-1",
82+
buildNewFakeTransportAddress(),
83+
Collections.emptyMap(),
84+
Collections.singleton(DiscoveryNodeRole.MASTER_ROLE),
85+
Version.CURRENT))
86+
.add(new DiscoveryNode("current-data-node-with-1-tasks",
87+
buildNewFakeTransportAddress(),
88+
Collections.emptyMap(),
89+
new HashSet<>(Arrays.asList(DiscoveryNodeRole.DATA_ROLE, DiscoveryNodeRole.MASTER_ROLE)),
90+
Version.CURRENT));
91+
92+
ClusterState.Builder csBuilder = ClusterState.builder(new ClusterName("_name"))
93+
.nodes(nodes);
94+
csBuilder.routingTable(routingTable.build());
95+
csBuilder.metaData(metaData);
96+
97+
ClusterState cs = csBuilder.build();
98+
Client client = mock(Client.class);
99+
DataFrameTransformsConfigManager transformsConfigManager = new DataFrameTransformsConfigManager(client, xContentRegistry());
100+
DataFrameTransformsCheckpointService dataFrameTransformsCheckpointService = new DataFrameTransformsCheckpointService(client,
101+
transformsConfigManager);
102+
103+
DataFrameTransformPersistentTasksExecutor executor = new DataFrameTransformPersistentTasksExecutor(client,
104+
transformsConfigManager,
105+
dataFrameTransformsCheckpointService, mock(SchedulerEngine.class),
106+
new DataFrameAuditor(client, ""),
107+
mock(ThreadPool.class));
108+
109+
assertThat(executor.getAssignment(new DataFrameTransform("new-task-id", Version.CURRENT), cs).getExecutorNode(),
110+
equalTo("current-data-node-with-1-tasks"));
111+
assertThat(executor.getAssignment(new DataFrameTransform("new-old-task-id", Version.V_7_2_0), cs).getExecutorNode(),
112+
equalTo("past-data-node-1"));
113+
}
114+
31115
public void testVerifyIndicesPrimaryShardsAreActive() {
32116
MetaData.Builder metaData = MetaData.builder();
33117
RoutingTable.Builder routingTable = RoutingTable.builder();

0 commit comments

Comments
 (0)