Skip to content

Commit 81549e6

Browse files
committed
[ML] Job In Index: Migrate config from the clusterstate (#35834)
Migrate ML configuration from clusterstate to index for closed jobs only once all nodes are v6.6.0 or higher
1 parent 29fc10d commit 81549e6

File tree

16 files changed

+980
-105
lines changed

16 files changed

+980
-105
lines changed

x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/MlMetadata.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -405,14 +405,14 @@ private void checkDatafeedIsStopped(Supplier<String> msg, String datafeedId, Per
405405
}
406406
}
407407

408-
private Builder putJobs(Collection<Job> jobs) {
408+
public Builder putJobs(Collection<Job> jobs) {
409409
for (Job job : jobs) {
410410
putJob(job, true);
411411
}
412412
return this;
413413
}
414414

415-
private Builder putDatafeeds(Collection<DatafeedConfig> datafeeds) {
415+
public Builder putDatafeeds(Collection<DatafeedConfig> datafeeds) {
416416
for (DatafeedConfig datafeed : datafeeds) {
417417
this.datafeeds.put(datafeed.getId(), datafeed);
418418
}

x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/MlTasks.java

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -98,6 +98,23 @@ public static Set<String> openJobIds(@Nullable PersistentTasksCustomMetaData tas
9898
.collect(Collectors.toSet());
9999
}
100100

101+
/**
102+
* The datafeed Ids of started datafeed tasks
103+
*
104+
* @param tasks Persistent tasks. If null an empty set is returned.
105+
* @return The Ids of running datafeed tasks
106+
*/
107+
public static Set<String> startedDatafeedIds(@Nullable PersistentTasksCustomMetaData tasks) {
108+
if (tasks == null) {
109+
return Collections.emptySet();
110+
}
111+
112+
return tasks.findTasks(DATAFEED_TASK_NAME, task -> true)
113+
.stream()
114+
.map(t -> t.getId().substring(DATAFEED_TASK_ID_PREFIX.length()))
115+
.collect(Collectors.toSet());
116+
}
117+
101118
/**
102119
* Is there an ml anomaly detector job task for the job {@code jobId}?
103120
* @param jobId The job id

x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/MlTasksTests.java

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -91,6 +91,26 @@ public void testOpenJobIds_GivenNull() {
9191
assertThat(MlTasks.openJobIds(null), empty());
9292
}
9393

94+
public void testStartedDatafeedIds() {
95+
PersistentTasksCustomMetaData.Builder tasksBuilder = PersistentTasksCustomMetaData.builder();
96+
assertThat(MlTasks.openJobIds(tasksBuilder.build()), empty());
97+
98+
tasksBuilder.addTask(MlTasks.jobTaskId("job-1"), MlTasks.JOB_TASK_NAME, new OpenJobAction.JobParams("foo-1"),
99+
new PersistentTasksCustomMetaData.Assignment("node-1", "test assignment"));
100+
tasksBuilder.addTask(MlTasks.datafeedTaskId("df1"), MlTasks.DATAFEED_TASK_NAME,
101+
new StartDatafeedAction.DatafeedParams("df1", 0L),
102+
new PersistentTasksCustomMetaData.Assignment("node-1", "test assignment"));
103+
tasksBuilder.addTask(MlTasks.datafeedTaskId("df2"), MlTasks.DATAFEED_TASK_NAME,
104+
new StartDatafeedAction.DatafeedParams("df2", 0L),
105+
new PersistentTasksCustomMetaData.Assignment("node-2", "test assignment"));
106+
107+
assertThat(MlTasks.startedDatafeedIds(tasksBuilder.build()), containsInAnyOrder("df1", "df2"));
108+
}
109+
110+
public void testStartedDatafeedIds_GivenNull() {
111+
assertThat(MlTasks.startedDatafeedIds(null), empty());
112+
}
113+
94114
public void testTaskExistsForJob() {
95115
PersistentTasksCustomMetaData.Builder tasksBuilder = PersistentTasksCustomMetaData.builder();
96116
assertFalse(MlTasks.taskExistsForJob("job-1", tasksBuilder.build()));

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -438,7 +438,7 @@ public Collection<Object> createComponents(Client client, ClusterService cluster
438438
jobDataCountsPersister,
439439
datafeedManager,
440440
auditor,
441-
new MlAssignmentNotifier(auditor, clusterService),
441+
new MlAssignmentNotifier(auditor, threadPool, client, clusterService),
442442
memoryTracker
443443
);
444444
}
@@ -453,7 +453,7 @@ public List<PersistentTasksExecutor<?>> getPersistentTasksExecutor(ClusterServic
453453

454454
return Arrays.asList(
455455
new TransportOpenJobAction.OpenJobPersistentTasksExecutor(settings, clusterService, autodetectProcessManager.get(),
456-
memoryTracker.get()),
456+
memoryTracker.get(), client),
457457
new TransportStartDatafeedAction.StartDatafeedPersistentTasksExecutor( datafeedManager.get())
458458
);
459459
}

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MlAssignmentNotifier.java

Lines changed: 38 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,11 @@
77

88
import org.apache.logging.log4j.LogManager;
99
import org.apache.logging.log4j.Logger;
10+
import org.elasticsearch.Version;
11+
import org.elasticsearch.action.ActionListener;
12+
import org.elasticsearch.client.Client;
1013
import org.elasticsearch.cluster.ClusterChangedEvent;
14+
import org.elasticsearch.cluster.ClusterState;
1115
import org.elasticsearch.cluster.ClusterStateListener;
1216
import org.elasticsearch.cluster.LocalNodeMasterListener;
1317
import org.elasticsearch.cluster.node.DiscoveryNode;
@@ -30,12 +34,23 @@ public class MlAssignmentNotifier implements ClusterStateListener, LocalNodeMast
3034

3135
private final Auditor auditor;
3236
private final ClusterService clusterService;
33-
37+
private final MlConfigMigrator mlConfigMigrator;
38+
private final ThreadPool threadPool;
3439
private final AtomicBoolean enabled = new AtomicBoolean(false);
3540

36-
MlAssignmentNotifier(Auditor auditor, ClusterService clusterService) {
41+
MlAssignmentNotifier(Auditor auditor, ThreadPool threadPool, Client client, ClusterService clusterService) {
42+
this.auditor = auditor;
43+
this.clusterService = clusterService;
44+
this.mlConfigMigrator = new MlConfigMigrator(client, clusterService);
45+
this.threadPool = threadPool;
46+
clusterService.addLocalNodeMasterListener(this);
47+
}
48+
49+
MlAssignmentNotifier(Auditor auditor, ThreadPool threadPool, MlConfigMigrator mlConfigMigrator, ClusterService clusterService) {
3750
this.auditor = auditor;
3851
this.clusterService = clusterService;
52+
this.mlConfigMigrator = mlConfigMigrator;
53+
this.threadPool = threadPool;
3954
clusterService.addLocalNodeMasterListener(this);
4055
}
4156

@@ -72,6 +87,25 @@ public void clusterChanged(ClusterChangedEvent event) {
7287
return;
7388
}
7489

90+
Version minNodeVersion = event.state().nodes().getMinNodeVersion();
91+
if (minNodeVersion.onOrAfter(Version.V_6_6_0)) {
92+
// ok to migrate
93+
mlConfigMigrator.migrateConfigsWithoutTasks(event.state(), ActionListener.wrap(
94+
response -> threadPool.executor(executorName()).execute(() -> auditChangesToMlTasks(current, previous, event.state())),
95+
e -> {
96+
logger.error("error migrating ml configurations", e);
97+
threadPool.executor(executorName()).execute(() -> auditChangesToMlTasks(current, previous, event.state()));
98+
}
99+
));
100+
} else {
101+
threadPool.executor(executorName()).execute(() -> auditChangesToMlTasks(current, previous, event.state()));
102+
}
103+
104+
}
105+
106+
private void auditChangesToMlTasks(PersistentTasksCustomMetaData current, PersistentTasksCustomMetaData previous,
107+
ClusterState state) {
108+
75109
for (PersistentTask<?> currentTask : current.tasks()) {
76110
Assignment currentAssignment = currentTask.getAssignment();
77111
PersistentTask<?> previousTask = previous != null ? previous.getTask(currentTask.getId()) : null;
@@ -84,7 +118,7 @@ public void clusterChanged(ClusterChangedEvent event) {
84118
if (currentAssignment.getExecutorNode() == null) {
85119
auditor.warning(jobId, "No node found to open job. Reasons [" + currentAssignment.getExplanation() + "]");
86120
} else {
87-
DiscoveryNode node = event.state().nodes().get(currentAssignment.getExecutorNode());
121+
DiscoveryNode node = state.nodes().get(currentAssignment.getExecutorNode());
88122
auditor.info(jobId, "Opening job on node [" + node.toString() + "]");
89123
}
90124
} else if (MlTasks.DATAFEED_TASK_NAME.equals(currentTask.getTaskName())) {
@@ -98,7 +132,7 @@ public void clusterChanged(ClusterChangedEvent event) {
98132
auditor.warning(jobId, msg);
99133
}
100134
} else {
101-
DiscoveryNode node = event.state().nodes().get(currentAssignment.getExecutorNode());
135+
DiscoveryNode node = state.nodes().get(currentAssignment.getExecutorNode());
102136
if (jobId != null) {
103137
auditor.info(jobId, "Starting datafeed [" + datafeedParams.getDatafeedId() + "] on node [" + node + "]");
104138
}

0 commit comments

Comments
 (0)