Skip to content

Commit 5105df2

Browse files
[FEATURE][ML] Split in batches and migrate all jobs and datafeeds
Relates #32905
1 parent ec4601e commit 5105df2

File tree

2 files changed

+132
-40
lines changed

2 files changed

+132
-40
lines changed

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MlConfigMigrator.java

Lines changed: 56 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import org.elasticsearch.cluster.metadata.MetaData;
2525
import org.elasticsearch.cluster.service.ClusterService;
2626
import org.elasticsearch.common.settings.Settings;
27+
import org.elasticsearch.common.util.concurrent.EsExecutors;
2728
import org.elasticsearch.common.xcontent.ToXContent;
2829
import org.elasticsearch.common.xcontent.ToXContentObject;
2930
import org.elasticsearch.common.xcontent.XContentBuilder;
@@ -38,6 +39,7 @@
3839
import org.elasticsearch.xpack.core.ml.utils.ToXContentParams;
3940
import org.elasticsearch.xpack.ml.datafeed.persistence.DatafeedConfigProvider;
4041
import org.elasticsearch.xpack.ml.job.persistence.JobConfigProvider;
42+
import org.elasticsearch.xpack.ml.utils.ChainTaskExecutor;
4143

4244
import java.io.IOException;
4345
import java.util.ArrayList;
@@ -96,14 +98,14 @@ public class MlConfigMigrator {
9698
private final MlConfigMigrationEligibilityCheck migrationEligibilityCheck;
9799

98100
private final AtomicBoolean migrationInProgress;
99-
private final AtomicBoolean firstTime;
101+
private final AtomicBoolean tookConfigSnapshot;
100102

101103
public MlConfigMigrator(Settings settings, Client client, ClusterService clusterService) {
102104
this.client = Objects.requireNonNull(client);
103105
this.clusterService = Objects.requireNonNull(clusterService);
104106
this.migrationEligibilityCheck = new MlConfigMigrationEligibilityCheck(settings, clusterService);
105107
this.migrationInProgress = new AtomicBoolean(false);
106-
this.firstTime = new AtomicBoolean(true);
108+
this.tookConfigSnapshot = new AtomicBoolean(false);
107109
}
108110

109111
/**
@@ -135,12 +137,7 @@ public void migrateConfigsWithoutTasks(ClusterState clusterState, ActionListener
135137
return;
136138
}
137139

138-
Collection<DatafeedConfig> stoppedDatafeeds = stoppedDatafeedConfigs(clusterState);
139-
Map<String, Job> eligibleJobs = nonDeletingJobs(closedJobConfigs(clusterState)).stream()
140-
.map(MlConfigMigrator::updateJobForMigration)
141-
.collect(Collectors.toMap(Job::getId, Function.identity(), (a, b) -> a));
142-
143-
JobsAndDatafeeds jobsAndDatafeedsToMigrate = limitWrites(stoppedDatafeeds, eligibleJobs);
140+
logger.debug("migrating ml configurations");
144141

145142
ActionListener<Boolean> unMarkMigrationInProgress = ActionListener.wrap(
146143
response -> {
@@ -153,37 +150,36 @@ public void migrateConfigsWithoutTasks(ClusterState clusterState, ActionListener
153150
}
154151
);
155152

156-
if (firstTime.get()) {
157-
snapshotMlMeta(MlMetadata.getMlMetadata(clusterState), ActionListener.wrap(
158-
response -> {
159-
firstTime.set(false);
160-
migrate(jobsAndDatafeedsToMigrate, unMarkMigrationInProgress);
161-
},
162-
unMarkMigrationInProgress::onFailure
163-
));
164-
return;
165-
}
153+
snapshotMlMeta(MlMetadata.getMlMetadata(clusterState), ActionListener.wrap(
154+
response -> {
155+
// We have successfully snapshotted the ML configs so we don't need to try again
156+
tookConfigSnapshot.set(true);
166157

167-
migrate(jobsAndDatafeedsToMigrate, unMarkMigrationInProgress);
158+
List<JobsAndDatafeeds> batches = splitInBatches(clusterState);
159+
if (batches.isEmpty()) {
160+
unMarkMigrationInProgress.onResponse(Boolean.FALSE);
161+
return;
162+
}
163+
migrateBatches(batches, unMarkMigrationInProgress);
164+
},
165+
unMarkMigrationInProgress::onFailure
166+
));
168167
}
169168

170-
private void migrate(JobsAndDatafeeds jobsAndDatafeedsToMigrate, ActionListener<Boolean> listener) {
171-
if (jobsAndDatafeedsToMigrate.totalCount() == 0) {
172-
listener.onResponse(Boolean.FALSE);
173-
return;
174-
}
175-
176-
logger.debug("migrating ml configurations");
177-
178-
writeConfigToIndex(jobsAndDatafeedsToMigrate.datafeedConfigs, jobsAndDatafeedsToMigrate.jobs, ActionListener.wrap(
169+
private void migrateBatches(List<JobsAndDatafeeds> batches, ActionListener<Boolean> listener) {
170+
ChainTaskExecutor chainTaskExecutor = new ChainTaskExecutor(EsExecutors.newDirectExecutorService(), true);
171+
for (JobsAndDatafeeds batch : batches) {
172+
chainTaskExecutor.add(chainedListener -> writeConfigToIndex(batch.datafeedConfigs, batch.jobs, ActionListener.wrap(
179173
failedDocumentIds -> {
180-
List<String> successfulJobWrites = filterFailedJobConfigWrites(failedDocumentIds, jobsAndDatafeedsToMigrate.jobs);
174+
List<String> successfulJobWrites = filterFailedJobConfigWrites(failedDocumentIds, batch.jobs);
181175
List<String> successfulDatafeedWrites =
182-
filterFailedDatafeedConfigWrites(failedDocumentIds, jobsAndDatafeedsToMigrate.datafeedConfigs);
183-
removeFromClusterState(successfulJobWrites, successfulDatafeedWrites, listener);
176+
filterFailedDatafeedConfigWrites(failedDocumentIds, batch.datafeedConfigs);
177+
removeFromClusterState(successfulJobWrites, successfulDatafeedWrites, chainedListener);
184178
},
185-
listener::onFailure
186-
));
179+
chainedListener::onFailure
180+
)));
181+
}
182+
chainTaskExecutor.execute(ActionListener.wrap(aVoid -> listener.onResponse(true), listener::onFailure));
187183
}
188184

189185
// Exposed for testing
@@ -208,9 +204,9 @@ public void writeConfigToIndex(Collection<DatafeedConfig> datafeedsToMigrate,
208204
}
209205

210206
private void removeFromClusterState(List<String> jobsToRemoveIds, List<String> datafeedsToRemoveIds,
211-
ActionListener<Boolean> listener) {
207+
ActionListener<Void> listener) {
212208
if (jobsToRemoveIds.isEmpty() && datafeedsToRemoveIds.isEmpty()) {
213-
listener.onResponse(Boolean.FALSE);
209+
listener.onResponse(null);
214210
return;
215211
}
216212

@@ -244,7 +240,7 @@ public void clusterStateProcessed(String source, ClusterState oldState, ClusterS
244240
logger.info("ml datafeed configurations migrated: {}", removedConfigs.get().removedDatafeedIds);
245241
}
246242
}
247-
listener.onResponse(Boolean.TRUE);
243+
listener.onResponse(null);
248244
}
249245
});
250246
}
@@ -326,12 +322,17 @@ private IndexRequest indexRequest(ToXContentObject source, String documentId, To
326322
// public for testing
327323
public void snapshotMlMeta(MlMetadata mlMetadata, ActionListener<Boolean> listener) {
328324

325+
if (tookConfigSnapshot.get()) {
326+
listener.onResponse(true);
327+
return;
328+
}
329+
329330
if (mlMetadata.getJobs().isEmpty() && mlMetadata.getDatafeeds().isEmpty()) {
330-
listener.onResponse(Boolean.TRUE);
331+
listener.onResponse(true);
331332
return;
332333
}
333334

334-
logger.debug("taking a snapshot of mlmetadata");
335+
logger.debug("taking a snapshot of ml_metadata");
335336
String documentId = "ml-config";
336337
IndexRequestBuilder indexRequest = client.prepareIndex(AnomalyDetectorsIndex.jobStateIndexName(),
337338
ElasticsearchMappings.DOC_TYPE, documentId)
@@ -345,7 +346,7 @@ public void snapshotMlMeta(MlMetadata mlMetadata, ActionListener<Boolean> listen
345346

346347
indexRequest.setSource(builder);
347348
} catch (IOException e) {
348-
logger.error("failed to serialise mlmetadata", e);
349+
logger.error("failed to serialise ml_metadata", e);
349350
listener.onFailure(e);
350351
return;
351352
}
@@ -437,6 +438,22 @@ public int totalCount() {
437438
}
438439
}
439440

441+
public static List<JobsAndDatafeeds> splitInBatches(ClusterState clusterState) {
442+
Collection<DatafeedConfig> stoppedDatafeeds = stoppedDatafeedConfigs(clusterState);
443+
Map<String, Job> eligibleJobs = nonDeletingJobs(closedJobConfigs(clusterState)).stream()
444+
.map(MlConfigMigrator::updateJobForMigration)
445+
.collect(Collectors.toMap(Job::getId, Function.identity(), (a, b) -> a));
446+
447+
List<JobsAndDatafeeds> batches = new ArrayList<>();
448+
while (stoppedDatafeeds.isEmpty() == false || eligibleJobs.isEmpty() == false) {
449+
JobsAndDatafeeds batch = limitWrites(stoppedDatafeeds, eligibleJobs);
450+
batches.add(batch);
451+
stoppedDatafeeds.removeAll(batch.datafeedConfigs);
452+
batch.jobs.forEach(job -> eligibleJobs.remove(job.getId()));
453+
}
454+
return batches;
455+
}
456+
440457
/**
441458
* Return at most {@link #MAX_BULK_WRITE_SIZE} configs favouring
442459
* datafeed and job pairs so if a datafeed is chosen so is its job.

x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/integration/MlConfigMigratorIT.java

Lines changed: 76 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -109,7 +109,6 @@ public void testWriteConfigToIndex() throws InterruptedException {
109109
}
110110

111111
public void testMigrateConfigs() throws InterruptedException, IOException {
112-
113112
// and jobs and datafeeds clusterstate
114113
MlMetadata.Builder mlMetadata = new MlMetadata.Builder();
115114
mlMetadata.putJob(buildJobBuilder("job-foo").build(), false);
@@ -166,6 +165,82 @@ public void testMigrateConfigs() throws InterruptedException, IOException {
166165
assertEquals("df-1", datafeedsHolder.get().get(0).getId());
167166
}
168167

168+
public void testMigrateConfigs_GivenLargeNumberOfJobsAndDatafeeds() throws InterruptedException {
169+
int jobCount = randomIntBetween(150, 201);
170+
int datafeedCount = randomIntBetween(150, jobCount);
171+
172+
// and jobs and datafeeds clusterstate
173+
MlMetadata.Builder mlMetadata = new MlMetadata.Builder();
174+
for (int i = 0; i < jobCount; i++) {
175+
mlMetadata.putJob(buildJobBuilder("job-" + i).build(), false);
176+
}
177+
for (int i = 0; i < datafeedCount; i++) {
178+
DatafeedConfig.Builder builder = new DatafeedConfig.Builder("df-" + i, "job-" + i);
179+
builder.setIndices(Collections.singletonList("beats*"));
180+
mlMetadata.putDatafeed(builder.build(), Collections.emptyMap());
181+
}
182+
183+
ClusterState clusterState = ClusterState.builder(new ClusterName("_name"))
184+
.metaData(MetaData.builder()
185+
.putCustom(MlMetadata.TYPE, mlMetadata.build()))
186+
.build();
187+
188+
doAnswer(invocation -> {
189+
ClusterStateUpdateTask listener = (ClusterStateUpdateTask) invocation.getArguments()[1];
190+
listener.clusterStateProcessed("source", mock(ClusterState.class), mock(ClusterState.class));
191+
return null;
192+
}).when(clusterService).submitStateUpdateTask(eq("remove-migrated-ml-configs"), any());
193+
194+
AtomicReference<Exception> exceptionHolder = new AtomicReference<>();
195+
AtomicReference<Boolean> responseHolder = new AtomicReference<>();
196+
197+
// do the migration
198+
MlConfigMigrator mlConfigMigrator = new MlConfigMigrator(nodeSettings(), client(), clusterService);
199+
blockingCall(actionListener -> mlConfigMigrator.migrateConfigsWithoutTasks(clusterState, actionListener),
200+
responseHolder, exceptionHolder);
201+
202+
assertNull(exceptionHolder.get());
203+
assertTrue(responseHolder.get());
204+
205+
// check the jobs have been migrated
206+
AtomicReference<List<Job.Builder>> jobsHolder = new AtomicReference<>();
207+
JobConfigProvider jobConfigProvider = new JobConfigProvider(client());
208+
blockingCall(actionListener -> jobConfigProvider.expandJobs("*", true, true, actionListener),
209+
jobsHolder, exceptionHolder);
210+
211+
assertNull(exceptionHolder.get());
212+
assertThat(jobsHolder.get(), hasSize(jobCount));
213+
214+
// check datafeeds are migrated
215+
DatafeedConfigProvider datafeedConfigProvider = new DatafeedConfigProvider(client(), xContentRegistry());
216+
AtomicReference<List<DatafeedConfig.Builder>> datafeedsHolder = new AtomicReference<>();
217+
blockingCall(actionListener -> datafeedConfigProvider.expandDatafeedConfigs("*", true, actionListener),
218+
datafeedsHolder, exceptionHolder);
219+
220+
assertNull(exceptionHolder.get());
221+
assertThat(datafeedsHolder.get(), hasSize(datafeedCount));
222+
}
223+
224+
public void testMigrateConfigs_GivenNoJobsOrDatafeeds() throws InterruptedException {
225+
// Add empty ML metadata
226+
MlMetadata.Builder mlMetadata = new MlMetadata.Builder();
227+
ClusterState clusterState = ClusterState.builder(new ClusterName("_name"))
228+
.metaData(MetaData.builder()
229+
.putCustom(MlMetadata.TYPE, mlMetadata.build()))
230+
.build();
231+
232+
AtomicReference<Exception> exceptionHolder = new AtomicReference<>();
233+
AtomicReference<Boolean> responseHolder = new AtomicReference<>();
234+
235+
// do the migration
236+
MlConfigMigrator mlConfigMigrator = new MlConfigMigrator(nodeSettings(), client(), clusterService);
237+
blockingCall(actionListener -> mlConfigMigrator.migrateConfigsWithoutTasks(clusterState, actionListener),
238+
responseHolder, exceptionHolder);
239+
240+
assertNull(exceptionHolder.get());
241+
assertFalse(responseHolder.get());
242+
}
243+
169244
public void testMigrateConfigsWithoutTasks_GivenMigrationIsDisabled() throws InterruptedException {
170245
Settings settings = Settings.builder().put(nodeSettings())
171246
.put(MlConfigMigrationEligibilityCheck.ENABLE_CONFIG_MIGRATION.getKey(), false)

0 commit comments

Comments
 (0)