Skip to content

Commit 93044e2

Browse files
authored
[ML] Job In Index: Migrate config from the clusterstate (#35834)
Migrate ML configuration from clusterstate to index for closed jobs only once all nodes are v6.6.0 or higher
1 parent 1a58e69 commit 93044e2

File tree

19 files changed

+1011
-137
lines changed

19 files changed

+1011
-137
lines changed

qa/rolling-upgrade/src/test/java/org/elasticsearch/upgrades/XPackIT.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -89,6 +89,7 @@ public void testIndexTemplatesCreated() throws Exception {
8989
if (masterIsNewVersion()) {
9090
// Everything else waits until the master is upgraded to create its templates
9191
expectedTemplates.add(".ml-anomalies-");
92+
expectedTemplates.add(".ml-config");
9293
expectedTemplates.add(".ml-meta");
9394
expectedTemplates.add(".ml-notifications");
9495
expectedTemplates.add(".ml-state");

x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/MlMetadata.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -410,14 +410,14 @@ private void checkDatafeedIsStopped(Supplier<String> msg, String datafeedId, Per
410410
}
411411
}
412412

413-
private Builder putJobs(Collection<Job> jobs) {
413+
public Builder putJobs(Collection<Job> jobs) {
414414
for (Job job : jobs) {
415415
putJob(job, true);
416416
}
417417
return this;
418418
}
419419

420-
private Builder putDatafeeds(Collection<DatafeedConfig> datafeeds) {
420+
public Builder putDatafeeds(Collection<DatafeedConfig> datafeeds) {
421421
for (DatafeedConfig datafeed : datafeeds) {
422422
this.datafeeds.put(datafeed.getId(), datafeed);
423423
}

x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/MlTasks.java

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -98,6 +98,23 @@ public static Set<String> openJobIds(@Nullable PersistentTasksCustomMetaData tas
9898
.collect(Collectors.toSet());
9999
}
100100

101+
/**
102+
* The datafeed Ids of started datafeed tasks
103+
*
104+
* @param tasks Persistent tasks. If null an empty set is returned.
105+
* @return The Ids of running datafeed tasks
106+
*/
107+
public static Set<String> startedDatafeedIds(@Nullable PersistentTasksCustomMetaData tasks) {
108+
if (tasks == null) {
109+
return Collections.emptySet();
110+
}
111+
112+
return tasks.findTasks(DATAFEED_TASK_NAME, task -> true)
113+
.stream()
114+
.map(t -> t.getId().substring(DATAFEED_TASK_ID_PREFIX.length()))
115+
.collect(Collectors.toSet());
116+
}
117+
101118
/**
102119
* Is there an ml anomaly detector job task for the job {@code jobId}?
103120
* @param jobId The job id

x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/MlTasksTests.java

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -91,6 +91,26 @@ public void testOpenJobIds_GivenNull() {
9191
assertThat(MlTasks.openJobIds(null), empty());
9292
}
9393

94+
public void testStartedDatafeedIds() {
95+
PersistentTasksCustomMetaData.Builder tasksBuilder = PersistentTasksCustomMetaData.builder();
96+
assertThat(MlTasks.openJobIds(tasksBuilder.build()), empty());
97+
98+
tasksBuilder.addTask(MlTasks.jobTaskId("job-1"), MlTasks.JOB_TASK_NAME, new OpenJobAction.JobParams("foo-1"),
99+
new PersistentTasksCustomMetaData.Assignment("node-1", "test assignment"));
100+
tasksBuilder.addTask(MlTasks.datafeedTaskId("df1"), MlTasks.DATAFEED_TASK_NAME,
101+
new StartDatafeedAction.DatafeedParams("df1", 0L),
102+
new PersistentTasksCustomMetaData.Assignment("node-1", "test assignment"));
103+
tasksBuilder.addTask(MlTasks.datafeedTaskId("df2"), MlTasks.DATAFEED_TASK_NAME,
104+
new StartDatafeedAction.DatafeedParams("df2", 0L),
105+
new PersistentTasksCustomMetaData.Assignment("node-2", "test assignment"));
106+
107+
assertThat(MlTasks.startedDatafeedIds(tasksBuilder.build()), containsInAnyOrder("df1", "df2"));
108+
}
109+
110+
public void testStartedDatafeedIds_GivenNull() {
111+
assertThat(MlTasks.startedDatafeedIds(null), empty());
112+
}
113+
94114
public void testTaskExistsForJob() {
95115
PersistentTasksCustomMetaData.Builder tasksBuilder = PersistentTasksCustomMetaData.builder();
96116
assertFalse(MlTasks.taskExistsForJob("job-1", tasksBuilder.build()));

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -443,7 +443,7 @@ public Collection<Object> createComponents(Client client, ClusterService cluster
443443
jobDataCountsPersister,
444444
datafeedManager,
445445
auditor,
446-
new MlAssignmentNotifier(auditor, clusterService),
446+
new MlAssignmentNotifier(auditor, threadPool, client, clusterService),
447447
memoryTracker
448448
);
449449
}
@@ -458,7 +458,7 @@ public List<PersistentTasksExecutor<?>> getPersistentTasksExecutor(ClusterServic
458458

459459
return Arrays.asList(
460460
new TransportOpenJobAction.OpenJobPersistentTasksExecutor(settings, clusterService, autodetectProcessManager.get(),
461-
memoryTracker.get()),
461+
memoryTracker.get(), client),
462462
new TransportStartDatafeedAction.StartDatafeedPersistentTasksExecutor(settings, datafeedManager.get())
463463
);
464464
}

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MlAssignmentNotifier.java

Lines changed: 38 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,11 @@
77

88
import org.apache.logging.log4j.LogManager;
99
import org.apache.logging.log4j.Logger;
10+
import org.elasticsearch.Version;
11+
import org.elasticsearch.action.ActionListener;
12+
import org.elasticsearch.client.Client;
1013
import org.elasticsearch.cluster.ClusterChangedEvent;
14+
import org.elasticsearch.cluster.ClusterState;
1115
import org.elasticsearch.cluster.ClusterStateListener;
1216
import org.elasticsearch.cluster.LocalNodeMasterListener;
1317
import org.elasticsearch.cluster.node.DiscoveryNode;
@@ -29,12 +33,23 @@ public class MlAssignmentNotifier implements ClusterStateListener, LocalNodeMast
2933

3034
private final Auditor auditor;
3135
private final ClusterService clusterService;
32-
36+
private final MlConfigMigrator mlConfigMigrator;
37+
private final ThreadPool threadPool;
3338
private final AtomicBoolean enabled = new AtomicBoolean(false);
3439

35-
MlAssignmentNotifier(Auditor auditor, ClusterService clusterService) {
40+
MlAssignmentNotifier(Auditor auditor, ThreadPool threadPool, Client client, ClusterService clusterService) {
41+
this.auditor = auditor;
42+
this.clusterService = clusterService;
43+
this.mlConfigMigrator = new MlConfigMigrator(client, clusterService);
44+
this.threadPool = threadPool;
45+
clusterService.addLocalNodeMasterListener(this);
46+
}
47+
48+
MlAssignmentNotifier(Auditor auditor, ThreadPool threadPool, MlConfigMigrator mlConfigMigrator, ClusterService clusterService) {
3649
this.auditor = auditor;
3750
this.clusterService = clusterService;
51+
this.mlConfigMigrator = mlConfigMigrator;
52+
this.threadPool = threadPool;
3853
clusterService.addLocalNodeMasterListener(this);
3954
}
4055

@@ -71,6 +86,25 @@ public void clusterChanged(ClusterChangedEvent event) {
7186
return;
7287
}
7388

89+
Version minNodeVersion = event.state().nodes().getMinNodeVersion();
90+
if (minNodeVersion.onOrAfter(Version.V_6_6_0)) {
91+
// ok to migrate
92+
mlConfigMigrator.migrateConfigsWithoutTasks(event.state(), ActionListener.wrap(
93+
response -> threadPool.executor(executorName()).execute(() -> auditChangesToMlTasks(current, previous, event.state())),
94+
e -> {
95+
logger.error("error migrating ml configurations", e);
96+
threadPool.executor(executorName()).execute(() -> auditChangesToMlTasks(current, previous, event.state()));
97+
}
98+
));
99+
} else {
100+
threadPool.executor(executorName()).execute(() -> auditChangesToMlTasks(current, previous, event.state()));
101+
}
102+
103+
}
104+
105+
private void auditChangesToMlTasks(PersistentTasksCustomMetaData current, PersistentTasksCustomMetaData previous,
106+
ClusterState state) {
107+
74108
for (PersistentTask<?> currentTask : current.tasks()) {
75109
Assignment currentAssignment = currentTask.getAssignment();
76110
PersistentTask<?> previousTask = previous != null ? previous.getTask(currentTask.getId()) : null;
@@ -83,7 +117,7 @@ public void clusterChanged(ClusterChangedEvent event) {
83117
if (currentAssignment.getExecutorNode() == null) {
84118
auditor.warning(jobId, "No node found to open job. Reasons [" + currentAssignment.getExplanation() + "]");
85119
} else {
86-
DiscoveryNode node = event.state().nodes().get(currentAssignment.getExecutorNode());
120+
DiscoveryNode node = state.nodes().get(currentAssignment.getExecutorNode());
87121
auditor.info(jobId, "Opening job on node [" + node.toString() + "]");
88122
}
89123
} else if (MlTasks.DATAFEED_TASK_NAME.equals(currentTask.getTaskName())) {
@@ -97,7 +131,7 @@ public void clusterChanged(ClusterChangedEvent event) {
97131
auditor.warning(jobId, msg);
98132
}
99133
} else {
100-
DiscoveryNode node = event.state().nodes().get(currentAssignment.getExecutorNode());
134+
DiscoveryNode node = state.nodes().get(currentAssignment.getExecutorNode());
101135
if (jobId != null) {
102136
auditor.info(jobId, "Starting datafeed [" + datafeedParams.getDatafeedId() + "] on node [" + node + "]");
103137
}

0 commit comments

Comments
 (0)