Skip to content

[FEATURE][ML] Split in batches and migrate all jobs and datafeeds #36716

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.ToXContentObject;
import org.elasticsearch.common.xcontent.XContentBuilder;
Expand All @@ -38,6 +39,7 @@
import org.elasticsearch.xpack.core.ml.utils.ToXContentParams;
import org.elasticsearch.xpack.ml.datafeed.persistence.DatafeedConfigProvider;
import org.elasticsearch.xpack.ml.job.persistence.JobConfigProvider;
import org.elasticsearch.xpack.ml.utils.ChainTaskExecutor;

import java.io.IOException;
import java.util.ArrayList;
Expand Down Expand Up @@ -96,14 +98,14 @@ public class MlConfigMigrator {
private final MlConfigMigrationEligibilityCheck migrationEligibilityCheck;

private final AtomicBoolean migrationInProgress;
private final AtomicBoolean firstTime;
private final AtomicBoolean tookConfigSnapshot;

public MlConfigMigrator(Settings settings, Client client, ClusterService clusterService) {
this.client = Objects.requireNonNull(client);
this.clusterService = Objects.requireNonNull(clusterService);
this.migrationEligibilityCheck = new MlConfigMigrationEligibilityCheck(settings, clusterService);
this.migrationInProgress = new AtomicBoolean(false);
this.firstTime = new AtomicBoolean(true);
this.tookConfigSnapshot = new AtomicBoolean(false);
}

/**
Expand Down Expand Up @@ -135,12 +137,7 @@ public void migrateConfigsWithoutTasks(ClusterState clusterState, ActionListener
return;
}

Collection<DatafeedConfig> stoppedDatafeeds = stoppedDatafeedConfigs(clusterState);
Map<String, Job> eligibleJobs = nonDeletingJobs(closedJobConfigs(clusterState)).stream()
.map(MlConfigMigrator::updateJobForMigration)
.collect(Collectors.toMap(Job::getId, Function.identity(), (a, b) -> a));

JobsAndDatafeeds jobsAndDatafeedsToMigrate = limitWrites(stoppedDatafeeds, eligibleJobs);
logger.debug("migrating ml configurations");

ActionListener<Boolean> unMarkMigrationInProgress = ActionListener.wrap(
response -> {
Expand All @@ -153,37 +150,36 @@ public void migrateConfigsWithoutTasks(ClusterState clusterState, ActionListener
}
);

if (firstTime.get()) {
snapshotMlMeta(MlMetadata.getMlMetadata(clusterState), ActionListener.wrap(
response -> {
firstTime.set(false);
migrate(jobsAndDatafeedsToMigrate, unMarkMigrationInProgress);
},
unMarkMigrationInProgress::onFailure
));
return;
}
snapshotMlMeta(MlMetadata.getMlMetadata(clusterState), ActionListener.wrap(
response -> {
// We have successfully snapshotted the ML configs so we don't need to try again
tookConfigSnapshot.set(true);

migrate(jobsAndDatafeedsToMigrate, unMarkMigrationInProgress);
List<JobsAndDatafeeds> batches = splitInBatches(clusterState);
if (batches.isEmpty()) {
unMarkMigrationInProgress.onResponse(Boolean.FALSE);
return;
}
migrateBatches(batches, unMarkMigrationInProgress);
},
unMarkMigrationInProgress::onFailure
));
}

private void migrate(JobsAndDatafeeds jobsAndDatafeedsToMigrate, ActionListener<Boolean> listener) {
if (jobsAndDatafeedsToMigrate.totalCount() == 0) {
listener.onResponse(Boolean.FALSE);
return;
}

logger.debug("migrating ml configurations");

writeConfigToIndex(jobsAndDatafeedsToMigrate.datafeedConfigs, jobsAndDatafeedsToMigrate.jobs, ActionListener.wrap(
private void migrateBatches(List<JobsAndDatafeeds> batches, ActionListener<Boolean> listener) {
ChainTaskExecutor chainTaskExecutor = new ChainTaskExecutor(EsExecutors.newDirectExecutorService(), true);
for (JobsAndDatafeeds batch : batches) {
chainTaskExecutor.add(chainedListener -> writeConfigToIndex(batch.datafeedConfigs, batch.jobs, ActionListener.wrap(
failedDocumentIds -> {
List<String> successfulJobWrites = filterFailedJobConfigWrites(failedDocumentIds, jobsAndDatafeedsToMigrate.jobs);
List<String> successfulJobWrites = filterFailedJobConfigWrites(failedDocumentIds, batch.jobs);
List<String> successfulDatafeedWrites =
filterFailedDatafeedConfigWrites(failedDocumentIds, jobsAndDatafeedsToMigrate.datafeedConfigs);
removeFromClusterState(successfulJobWrites, successfulDatafeedWrites, listener);
filterFailedDatafeedConfigWrites(failedDocumentIds, batch.datafeedConfigs);
removeFromClusterState(successfulJobWrites, successfulDatafeedWrites, chainedListener);
},
listener::onFailure
));
chainedListener::onFailure
)));
}
chainTaskExecutor.execute(ActionListener.wrap(aVoid -> listener.onResponse(true), listener::onFailure));
}

// Exposed for testing
Expand All @@ -208,9 +204,9 @@ public void writeConfigToIndex(Collection<DatafeedConfig> datafeedsToMigrate,
}

private void removeFromClusterState(List<String> jobsToRemoveIds, List<String> datafeedsToRemoveIds,
ActionListener<Boolean> listener) {
ActionListener<Void> listener) {
if (jobsToRemoveIds.isEmpty() && datafeedsToRemoveIds.isEmpty()) {
listener.onResponse(Boolean.FALSE);
listener.onResponse(null);
return;
}

Expand Down Expand Up @@ -244,7 +240,7 @@ public void clusterStateProcessed(String source, ClusterState oldState, ClusterS
logger.info("ml datafeed configurations migrated: {}", removedConfigs.get().removedDatafeedIds);
}
}
listener.onResponse(Boolean.TRUE);
listener.onResponse(null);
}
});
}
Expand Down Expand Up @@ -326,12 +322,17 @@ private IndexRequest indexRequest(ToXContentObject source, String documentId, To
// public for testing
public void snapshotMlMeta(MlMetadata mlMetadata, ActionListener<Boolean> listener) {

if (tookConfigSnapshot.get()) {
listener.onResponse(true);
return;
}

if (mlMetadata.getJobs().isEmpty() && mlMetadata.getDatafeeds().isEmpty()) {
listener.onResponse(Boolean.TRUE);
listener.onResponse(true);
return;
}

logger.debug("taking a snapshot of mlmetadata");
logger.debug("taking a snapshot of ml_metadata");
String documentId = "ml-config";
IndexRequestBuilder indexRequest = client.prepareIndex(AnomalyDetectorsIndex.jobStateIndexName(),
ElasticsearchMappings.DOC_TYPE, documentId)
Expand All @@ -345,7 +346,7 @@ public void snapshotMlMeta(MlMetadata mlMetadata, ActionListener<Boolean> listen

indexRequest.setSource(builder);
} catch (IOException e) {
logger.error("failed to serialise mlmetadata", e);
logger.error("failed to serialise ml_metadata", e);
listener.onFailure(e);
return;
}
Expand Down Expand Up @@ -437,6 +438,22 @@ public int totalCount() {
}
}

public static List<JobsAndDatafeeds> splitInBatches(ClusterState clusterState) {
Collection<DatafeedConfig> stoppedDatafeeds = stoppedDatafeedConfigs(clusterState);
Map<String, Job> eligibleJobs = nonDeletingJobs(closedJobConfigs(clusterState)).stream()
.map(MlConfigMigrator::updateJobForMigration)
.collect(Collectors.toMap(Job::getId, Function.identity(), (a, b) -> a));

List<JobsAndDatafeeds> batches = new ArrayList<>();
while (stoppedDatafeeds.isEmpty() == false || eligibleJobs.isEmpty() == false) {
JobsAndDatafeeds batch = limitWrites(stoppedDatafeeds, eligibleJobs);
batches.add(batch);
stoppedDatafeeds.removeAll(batch.datafeedConfigs);
batch.jobs.forEach(job -> eligibleJobs.remove(job.getId()));
}
return batches;
}

/**
* Return at most {@link #MAX_BULK_WRITE_SIZE} configs favouring
* datafeed and job pairs so if a datafeed is chosen so is its job.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,6 @@ public void testWriteConfigToIndex() throws InterruptedException {
}

public void testMigrateConfigs() throws InterruptedException, IOException {

// and jobs and datafeeds clusterstate
MlMetadata.Builder mlMetadata = new MlMetadata.Builder();
mlMetadata.putJob(buildJobBuilder("job-foo").build(), false);
Expand Down Expand Up @@ -166,6 +165,82 @@ public void testMigrateConfigs() throws InterruptedException, IOException {
assertEquals("df-1", datafeedsHolder.get().get(0).getId());
}

public void testMigrateConfigs_GivenLargeNumberOfJobsAndDatafeeds() throws InterruptedException {
int jobCount = randomIntBetween(150, 201);
int datafeedCount = randomIntBetween(150, jobCount);

// and jobs and datafeeds clusterstate
MlMetadata.Builder mlMetadata = new MlMetadata.Builder();
for (int i = 0; i < jobCount; i++) {
mlMetadata.putJob(buildJobBuilder("job-" + i).build(), false);
}
for (int i = 0; i < datafeedCount; i++) {
DatafeedConfig.Builder builder = new DatafeedConfig.Builder("df-" + i, "job-" + i);
builder.setIndices(Collections.singletonList("beats*"));
mlMetadata.putDatafeed(builder.build(), Collections.emptyMap());
}

ClusterState clusterState = ClusterState.builder(new ClusterName("_name"))
.metaData(MetaData.builder()
.putCustom(MlMetadata.TYPE, mlMetadata.build()))
.build();

doAnswer(invocation -> {
ClusterStateUpdateTask listener = (ClusterStateUpdateTask) invocation.getArguments()[1];
listener.clusterStateProcessed("source", mock(ClusterState.class), mock(ClusterState.class));
return null;
}).when(clusterService).submitStateUpdateTask(eq("remove-migrated-ml-configs"), any());

AtomicReference<Exception> exceptionHolder = new AtomicReference<>();
AtomicReference<Boolean> responseHolder = new AtomicReference<>();

// do the migration
MlConfigMigrator mlConfigMigrator = new MlConfigMigrator(nodeSettings(), client(), clusterService);
blockingCall(actionListener -> mlConfigMigrator.migrateConfigsWithoutTasks(clusterState, actionListener),
responseHolder, exceptionHolder);

assertNull(exceptionHolder.get());
assertTrue(responseHolder.get());

// check the jobs have been migrated
AtomicReference<List<Job.Builder>> jobsHolder = new AtomicReference<>();
JobConfigProvider jobConfigProvider = new JobConfigProvider(client());
blockingCall(actionListener -> jobConfigProvider.expandJobs("*", true, true, actionListener),
jobsHolder, exceptionHolder);

assertNull(exceptionHolder.get());
assertThat(jobsHolder.get(), hasSize(jobCount));

// check datafeeds are migrated
DatafeedConfigProvider datafeedConfigProvider = new DatafeedConfigProvider(client(), xContentRegistry());
AtomicReference<List<DatafeedConfig.Builder>> datafeedsHolder = new AtomicReference<>();
blockingCall(actionListener -> datafeedConfigProvider.expandDatafeedConfigs("*", true, actionListener),
datafeedsHolder, exceptionHolder);

assertNull(exceptionHolder.get());
assertThat(datafeedsHolder.get(), hasSize(datafeedCount));
}

public void testMigrateConfigs_GivenNoJobsOrDatafeeds() throws InterruptedException {
// Add empty ML metadata
MlMetadata.Builder mlMetadata = new MlMetadata.Builder();
ClusterState clusterState = ClusterState.builder(new ClusterName("_name"))
.metaData(MetaData.builder()
.putCustom(MlMetadata.TYPE, mlMetadata.build()))
.build();

AtomicReference<Exception> exceptionHolder = new AtomicReference<>();
AtomicReference<Boolean> responseHolder = new AtomicReference<>();

// do the migration
MlConfigMigrator mlConfigMigrator = new MlConfigMigrator(nodeSettings(), client(), clusterService);
blockingCall(actionListener -> mlConfigMigrator.migrateConfigsWithoutTasks(clusterState, actionListener),
responseHolder, exceptionHolder);

assertNull(exceptionHolder.get());
assertFalse(responseHolder.get());
}

public void testMigrateConfigsWithoutTasks_GivenMigrationIsDisabled() throws InterruptedException {
Settings settings = Settings.builder().put(nodeSettings())
.put(MlConfigMigrationEligibilityCheck.ENABLE_CONFIG_MIGRATION.getKey(), false)
Expand Down