Skip to content

Commit ec4601e

Browse files
authored
[ML] Snapshot ml configs before migrating (#36645)
1 parent 88b14bd commit ec4601e

File tree

5 files changed

+153
-66
lines changed

5 files changed

+153
-66
lines changed

x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/test/rest/XPackRestTestHelper.java

Lines changed: 0 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -43,12 +43,6 @@ public final class XPackRestTestHelper {
4343
private XPackRestTestHelper() {
4444
}
4545

46-
/**
47-
* Waits for the Machine Learning templates to be created
48-
* and check the version is up to date
49-
*/
50-
51-
5246
/**
5347
* For each template name wait for the template to be created and
5448
* for the template version to be equal to the master node version.
@@ -96,5 +90,4 @@ public static void waitForTemplates(RestClient client, List<String> templateName
9690
});
9791
}
9892
}
99-
10093
}

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MlAssignmentNotifier.java

Lines changed: 7 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,6 @@
1212
import org.elasticsearch.cluster.ClusterChangedEvent;
1313
import org.elasticsearch.cluster.ClusterState;
1414
import org.elasticsearch.cluster.ClusterStateListener;
15-
import org.elasticsearch.cluster.LocalNodeMasterListener;
1615
import org.elasticsearch.cluster.node.DiscoveryNode;
1716
import org.elasticsearch.cluster.service.ClusterService;
1817
import org.elasticsearch.common.settings.Settings;
@@ -26,57 +25,39 @@
2625
import org.elasticsearch.xpack.ml.notifications.Auditor;
2726

2827
import java.util.Objects;
29-
import java.util.concurrent.atomic.AtomicBoolean;
3028

31-
public class MlAssignmentNotifier implements ClusterStateListener, LocalNodeMasterListener {
29+
public class MlAssignmentNotifier implements ClusterStateListener {
3230
private static final Logger logger = LogManager.getLogger(MlAssignmentNotifier.class);
3331

3432
private final Auditor auditor;
35-
private final ClusterService clusterService;
3633
private final MlConfigMigrator mlConfigMigrator;
3734
private final ThreadPool threadPool;
38-
private final AtomicBoolean enabled = new AtomicBoolean(false);
3935

4036
MlAssignmentNotifier(Settings settings, Auditor auditor, ThreadPool threadPool, Client client, ClusterService clusterService) {
4137
this.auditor = auditor;
42-
this.clusterService = clusterService;
4338
this.mlConfigMigrator = new MlConfigMigrator(settings, client, clusterService);
4439
this.threadPool = threadPool;
45-
clusterService.addLocalNodeMasterListener(this);
40+
clusterService.addListener(this);
4641
}
4742

4843
MlAssignmentNotifier(Auditor auditor, ThreadPool threadPool, MlConfigMigrator mlConfigMigrator, ClusterService clusterService) {
4944
this.auditor = auditor;
50-
this.clusterService = clusterService;
5145
this.mlConfigMigrator = mlConfigMigrator;
5246
this.threadPool = threadPool;
53-
clusterService.addLocalNodeMasterListener(this);
47+
clusterService.addListener(this);
5448
}
5549

56-
@Override
57-
public void onMaster() {
58-
if (enabled.compareAndSet(false, true)) {
59-
clusterService.addListener(this);
60-
}
61-
}
62-
63-
@Override
64-
public void offMaster() {
65-
if (enabled.compareAndSet(true, false)) {
66-
clusterService.removeListener(this);
67-
}
68-
}
69-
70-
@Override
71-
public String executorName() {
50+
private String executorName() {
7251
return ThreadPool.Names.GENERIC;
7352
}
7453

7554
@Override
7655
public void clusterChanged(ClusterChangedEvent event) {
77-
if (enabled.get() == false) {
56+
57+
if (event.localNodeMaster() == false) {
7858
return;
7959
}
60+
8061
if (event.metaDataChanged() == false) {
8162
return;
8263
}

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MlConfigMigrator.java

Lines changed: 67 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -9,10 +9,14 @@
99
import org.apache.logging.log4j.Logger;
1010
import org.elasticsearch.Version;
1111
import org.elasticsearch.action.ActionListener;
12+
import org.elasticsearch.action.DocWriteRequest;
13+
import org.elasticsearch.action.DocWriteResponse;
1214
import org.elasticsearch.action.bulk.BulkItemResponse;
1315
import org.elasticsearch.action.bulk.BulkRequestBuilder;
1416
import org.elasticsearch.action.bulk.BulkResponse;
1517
import org.elasticsearch.action.index.IndexRequest;
18+
import org.elasticsearch.action.index.IndexRequestBuilder;
19+
import org.elasticsearch.action.index.IndexResponse;
1620
import org.elasticsearch.action.support.WriteRequest;
1721
import org.elasticsearch.client.Client;
1822
import org.elasticsearch.cluster.ClusterState;
@@ -31,12 +35,14 @@
3135
import org.elasticsearch.xpack.core.ml.job.config.Job;
3236
import org.elasticsearch.xpack.core.ml.job.persistence.AnomalyDetectorsIndex;
3337
import org.elasticsearch.xpack.core.ml.job.persistence.ElasticsearchMappings;
38+
import org.elasticsearch.xpack.core.ml.utils.ToXContentParams;
3439
import org.elasticsearch.xpack.ml.datafeed.persistence.DatafeedConfigProvider;
3540
import org.elasticsearch.xpack.ml.job.persistence.JobConfigProvider;
3641

3742
import java.io.IOException;
3843
import java.util.ArrayList;
3944
import java.util.Collection;
45+
import java.util.Collections;
4046
import java.util.HashMap;
4147
import java.util.HashSet;
4248
import java.util.Iterator;
@@ -90,12 +96,14 @@ public class MlConfigMigrator {
9096
private final MlConfigMigrationEligibilityCheck migrationEligibilityCheck;
9197

9298
private final AtomicBoolean migrationInProgress;
99+
private final AtomicBoolean firstTime;
93100

94101
public MlConfigMigrator(Settings settings, Client client, ClusterService clusterService) {
95102
this.client = Objects.requireNonNull(client);
96103
this.clusterService = Objects.requireNonNull(clusterService);
97104
this.migrationEligibilityCheck = new MlConfigMigrationEligibilityCheck(settings, clusterService);
98105
this.migrationInProgress = new AtomicBoolean(false);
106+
this.firstTime = new AtomicBoolean(true);
99107
}
100108

101109
/**
@@ -127,9 +135,6 @@ public void migrateConfigsWithoutTasks(ClusterState clusterState, ActionListener
127135
return;
128136
}
129137

130-
131-
logger.debug("migrating ml configurations");
132-
133138
Collection<DatafeedConfig> stoppedDatafeeds = stoppedDatafeedConfigs(clusterState);
134139
Map<String, Job> eligibleJobs = nonDeletingJobs(closedJobConfigs(clusterState)).stream()
135140
.map(MlConfigMigrator::updateJobForMigration)
@@ -148,19 +153,36 @@ public void migrateConfigsWithoutTasks(ClusterState clusterState, ActionListener
148153
}
149154
);
150155

156+
if (firstTime.get()) {
157+
snapshotMlMeta(MlMetadata.getMlMetadata(clusterState), ActionListener.wrap(
158+
response -> {
159+
firstTime.set(false);
160+
migrate(jobsAndDatafeedsToMigrate, unMarkMigrationInProgress);
161+
},
162+
unMarkMigrationInProgress::onFailure
163+
));
164+
return;
165+
}
166+
167+
migrate(jobsAndDatafeedsToMigrate, unMarkMigrationInProgress);
168+
}
169+
170+
private void migrate(JobsAndDatafeeds jobsAndDatafeedsToMigrate, ActionListener<Boolean> listener) {
151171
if (jobsAndDatafeedsToMigrate.totalCount() == 0) {
152-
unMarkMigrationInProgress.onResponse(Boolean.FALSE);
172+
listener.onResponse(Boolean.FALSE);
153173
return;
154174
}
155175

176+
logger.debug("migrating ml configurations");
177+
156178
writeConfigToIndex(jobsAndDatafeedsToMigrate.datafeedConfigs, jobsAndDatafeedsToMigrate.jobs, ActionListener.wrap(
157179
failedDocumentIds -> {
158180
List<String> successfulJobWrites = filterFailedJobConfigWrites(failedDocumentIds, jobsAndDatafeedsToMigrate.jobs);
159181
List<String> successfulDatafeedWrites =
160182
filterFailedDatafeedConfigWrites(failedDocumentIds, jobsAndDatafeedsToMigrate.datafeedConfigs);
161-
removeFromClusterState(successfulJobWrites, successfulDatafeedWrites, unMarkMigrationInProgress);
183+
removeFromClusterState(successfulJobWrites, successfulDatafeedWrites, listener);
162184
},
163-
unMarkMigrationInProgress::onFailure
185+
listener::onFailure
164186
));
165187
}
166188

@@ -300,6 +322,45 @@ private IndexRequest indexRequest(ToXContentObject source, String documentId, To
300322
return indexRequest;
301323
}
302324

325+
326+
// public for testing
327+
public void snapshotMlMeta(MlMetadata mlMetadata, ActionListener<Boolean> listener) {
328+
329+
if (mlMetadata.getJobs().isEmpty() && mlMetadata.getDatafeeds().isEmpty()) {
330+
listener.onResponse(Boolean.TRUE);
331+
return;
332+
}
333+
334+
logger.debug("taking a snapshot of mlmetadata");
335+
String documentId = "ml-config";
336+
IndexRequestBuilder indexRequest = client.prepareIndex(AnomalyDetectorsIndex.jobStateIndexName(),
337+
ElasticsearchMappings.DOC_TYPE, documentId)
338+
.setOpType(DocWriteRequest.OpType.CREATE);
339+
340+
ToXContent.MapParams params = new ToXContent.MapParams(Collections.singletonMap(ToXContentParams.FOR_INTERNAL_STORAGE, "true"));
341+
try (XContentBuilder builder = XContentFactory.jsonBuilder()) {
342+
builder.startObject();
343+
mlMetadata.toXContent(builder, params);
344+
builder.endObject();
345+
346+
indexRequest.setSource(builder);
347+
} catch (IOException e) {
348+
logger.error("failed to serialise mlmetadata", e);
349+
listener.onFailure(e);
350+
return;
351+
}
352+
353+
executeAsyncWithOrigin(client.threadPool().getThreadContext(), ML_ORIGIN, indexRequest.request(),
354+
ActionListener.<IndexResponse>wrap(
355+
indexResponse -> {
356+
listener.onResponse(indexResponse.getResult() == DocWriteResponse.Result.CREATED);
357+
},
358+
listener::onFailure),
359+
client::index
360+
);
361+
}
362+
363+
303364
public static Job updateJobForMigration(Job job) {
304365
Job.Builder builder = new Job.Builder(job);
305366
Map<String, Object> custom = job.getCustomSettings() == null ? new HashMap<>() : new HashMap<>(job.getCustomSettings());

x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/MlAssignmentNotifierTests.java

Lines changed: 46 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -69,34 +69,39 @@ private void setupMocks() {
6969

7070
public void testClusterChanged_info() {
7171
MlAssignmentNotifier notifier = new MlAssignmentNotifier(auditor, threadPool, configMigrator, clusterService);
72-
notifier.onMaster();
7372

74-
DiscoveryNode node =
75-
new DiscoveryNode("node_id", new TransportAddress(InetAddress.getLoopbackAddress(), 9300), Version.CURRENT);
7673
ClusterState previous = ClusterState.builder(new ClusterName("_name"))
7774
.metaData(MetaData.builder().putCustom(PersistentTasksCustomMetaData.TYPE,
7875
new PersistentTasksCustomMetaData(0L, Collections.emptyMap())))
7976
.build();
8077

8178
PersistentTasksCustomMetaData.Builder tasksBuilder = PersistentTasksCustomMetaData.builder();
82-
addJobTask("job_id", "node_id", null, tasksBuilder);
79+
addJobTask("job_id", "_node_id", null, tasksBuilder);
8380
MetaData metaData = MetaData.builder().putCustom(PersistentTasksCustomMetaData.TYPE, tasksBuilder.build()).build();
84-
ClusterState state = ClusterState.builder(new ClusterName("_name"))
81+
ClusterState newState = ClusterState.builder(new ClusterName("_name"))
8582
.metaData(metaData)
86-
.nodes(DiscoveryNodes.builder().add(node))
83+
// set local node master
84+
.nodes(DiscoveryNodes.builder()
85+
.add(new DiscoveryNode("_node_id", new TransportAddress(InetAddress.getLoopbackAddress(), 9300), Version.CURRENT))
86+
.localNodeId("_node_id")
87+
.masterNodeId("_node_id"))
8788
.build();
88-
notifier.clusterChanged(new ClusterChangedEvent("_test", state, previous));
89+
notifier.clusterChanged(new ClusterChangedEvent("_test", newState, previous));
8990
verify(auditor, times(1)).info(eq("job_id"), any());
90-
verify(configMigrator, times(1)).migrateConfigsWithoutTasks(eq(state), any());
91+
verify(configMigrator, times(1)).migrateConfigsWithoutTasks(eq(newState), any());
9192

92-
notifier.offMaster();
93-
notifier.clusterChanged(new ClusterChangedEvent("_test", state, previous));
93+
// no longer master
94+
newState = ClusterState.builder(new ClusterName("_name"))
95+
.metaData(metaData)
96+
.nodes(DiscoveryNodes.builder()
97+
.add(new DiscoveryNode("_node_id", new TransportAddress(InetAddress.getLoopbackAddress(), 9300), Version.CURRENT)))
98+
.build();
99+
notifier.clusterChanged(new ClusterChangedEvent("_test", newState, previous));
94100
verifyNoMoreInteractions(auditor);
95101
}
96102

97103
public void testClusterChanged_warning() {
98104
MlAssignmentNotifier notifier = new MlAssignmentNotifier(auditor, threadPool, configMigrator, clusterService);
99-
notifier.onMaster();
100105

101106
ClusterState previous = ClusterState.builder(new ClusterName("_name"))
102107
.metaData(MetaData.builder().putCustom(PersistentTasksCustomMetaData.TYPE,
@@ -106,21 +111,31 @@ public void testClusterChanged_warning() {
106111
PersistentTasksCustomMetaData.Builder tasksBuilder = PersistentTasksCustomMetaData.builder();
107112
addJobTask("job_id", null, null, tasksBuilder);
108113
MetaData metaData = MetaData.builder().putCustom(PersistentTasksCustomMetaData.TYPE, tasksBuilder.build()).build();
109-
ClusterState state = ClusterState.builder(new ClusterName("_name"))
114+
ClusterState newState = ClusterState.builder(new ClusterName("_name"))
110115
.metaData(metaData)
116+
// set local node master
117+
.nodes(DiscoveryNodes.builder()
118+
.add(new DiscoveryNode("_node_id", new TransportAddress(InetAddress.getLoopbackAddress(), 9200), Version.CURRENT))
119+
.localNodeId("_node_id")
120+
.masterNodeId("_node_id"))
111121
.build();
112-
notifier.clusterChanged(new ClusterChangedEvent("_test", state, previous));
122+
notifier.clusterChanged(new ClusterChangedEvent("_test", newState, previous));
113123
verify(auditor, times(1)).warning(eq("job_id"), any());
114-
verify(configMigrator, times(1)).migrateConfigsWithoutTasks(eq(state), any());
124+
verify(configMigrator, times(1)).migrateConfigsWithoutTasks(eq(newState), any());
115125

116-
notifier.offMaster();
117-
notifier.clusterChanged(new ClusterChangedEvent("_test", state, previous));
126+
// no longer master
127+
newState = ClusterState.builder(new ClusterName("_name"))
128+
.metaData(metaData)
129+
.nodes(DiscoveryNodes.builder()
130+
.add(new DiscoveryNode("_node_id", new TransportAddress(InetAddress.getLoopbackAddress(), 9200), Version.CURRENT)))
131+
.build();
132+
133+
notifier.clusterChanged(new ClusterChangedEvent("_test", newState, previous));
118134
verifyNoMoreInteractions(auditor);
119135
}
120136

121137
public void testClusterChanged_noPersistentTaskChanges() {
122138
MlAssignmentNotifier notifier = new MlAssignmentNotifier(auditor, threadPool, configMigrator, clusterService);
123-
notifier.onMaster();
124139

125140
PersistentTasksCustomMetaData.Builder tasksBuilder = PersistentTasksCustomMetaData.builder();
126141
addJobTask("job_id", null, null, tasksBuilder);
@@ -129,14 +144,25 @@ public void testClusterChanged_noPersistentTaskChanges() {
129144
.metaData(metaData)
130145
.build();
131146

132-
ClusterState current = ClusterState.builder(new ClusterName("_name"))
147+
ClusterState newState = ClusterState.builder(new ClusterName("_name"))
133148
.metaData(metaData)
149+
// set local node master
150+
.nodes(DiscoveryNodes.builder()
151+
.add(new DiscoveryNode("_node_id", new TransportAddress(InetAddress.getLoopbackAddress(), 9200), Version.CURRENT))
152+
.localNodeId("_node_id")
153+
.masterNodeId("_node_id"))
134154
.build();
135155

136-
notifier.clusterChanged(new ClusterChangedEvent("_test", current, previous));
156+
notifier.clusterChanged(new ClusterChangedEvent("_test", newState, previous));
137157
verify(configMigrator, never()).migrateConfigsWithoutTasks(any(), any());
138158

139-
notifier.offMaster();
159+
// no longer master
160+
newState = ClusterState.builder(new ClusterName("_name"))
161+
.metaData(metaData)
162+
.nodes(DiscoveryNodes.builder()
163+
.add(new DiscoveryNode("_node_id", new TransportAddress(InetAddress.getLoopbackAddress(), 9200), Version.CURRENT)))
164+
.build();
165+
notifier.clusterChanged(new ClusterChangedEvent("_test", newState, previous));
140166
verify(configMigrator, never()).migrateConfigsWithoutTasks(any(), any());
141167
}
142168
}

0 commit comments

Comments
 (0)