diff --git a/qa/rolling-upgrade/src/test/java/org/elasticsearch/upgrades/XPackIT.java b/qa/rolling-upgrade/src/test/java/org/elasticsearch/upgrades/XPackIT.java index b10d56ca8d3bb..33631d7f4354c 100644 --- a/qa/rolling-upgrade/src/test/java/org/elasticsearch/upgrades/XPackIT.java +++ b/qa/rolling-upgrade/src/test/java/org/elasticsearch/upgrades/XPackIT.java @@ -89,6 +89,7 @@ public void testIndexTemplatesCreated() throws Exception { if (masterIsNewVersion()) { // Everything else waits until the master is upgraded to create its templates expectedTemplates.add(".ml-anomalies-"); + expectedTemplates.add(".ml-config"); expectedTemplates.add(".ml-meta"); expectedTemplates.add(".ml-notifications"); expectedTemplates.add(".ml-state"); diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/MlMetadata.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/MlMetadata.java index 64b6e89cf56c2..f815efee1cf66 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/MlMetadata.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/MlMetadata.java @@ -410,14 +410,14 @@ private void checkDatafeedIsStopped(Supplier msg, String datafeedId, Per } } - private Builder putJobs(Collection jobs) { + public Builder putJobs(Collection jobs) { for (Job job : jobs) { putJob(job, true); } return this; } - private Builder putDatafeeds(Collection datafeeds) { + public Builder putDatafeeds(Collection datafeeds) { for (DatafeedConfig datafeed : datafeeds) { this.datafeeds.put(datafeed.getId(), datafeed); } diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/MlTasks.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/MlTasks.java index 5ba04fcc4087c..e78649d152296 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/MlTasks.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/MlTasks.java @@ -98,6 +98,23 @@ public static Set openJobIds(@Nullable PersistentTasksCustomMetaData tas .collect(Collectors.toSet()); } + /** + * The datafeed Ids of started datafeed tasks + * + * @param tasks Persistent tasks. If null an empty set is returned. + * @return The Ids of running datafeed tasks + */ + public static Set startedDatafeedIds(@Nullable PersistentTasksCustomMetaData tasks) { + if (tasks == null) { + return Collections.emptySet(); + } + + return tasks.findTasks(DATAFEED_TASK_NAME, task -> true) + .stream() + .map(t -> t.getId().substring(DATAFEED_TASK_ID_PREFIX.length())) + .collect(Collectors.toSet()); + } + /** * Is there an ml anomaly detector job task for the job {@code jobId}? * @param jobId The job id diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/MlTasksTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/MlTasksTests.java index c3579fe4173b8..408520472c4f2 100644 --- a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/MlTasksTests.java +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/MlTasksTests.java @@ -91,6 +91,26 @@ public void testOpenJobIds_GivenNull() { assertThat(MlTasks.openJobIds(null), empty()); } + public void testStartedDatafeedIds() { + PersistentTasksCustomMetaData.Builder tasksBuilder = PersistentTasksCustomMetaData.builder(); + assertThat(MlTasks.openJobIds(tasksBuilder.build()), empty()); + + tasksBuilder.addTask(MlTasks.jobTaskId("job-1"), MlTasks.JOB_TASK_NAME, new OpenJobAction.JobParams("foo-1"), + new PersistentTasksCustomMetaData.Assignment("node-1", "test assignment")); + tasksBuilder.addTask(MlTasks.datafeedTaskId("df1"), MlTasks.DATAFEED_TASK_NAME, + new StartDatafeedAction.DatafeedParams("df1", 0L), + new PersistentTasksCustomMetaData.Assignment("node-1", "test assignment")); + tasksBuilder.addTask(MlTasks.datafeedTaskId("df2"), MlTasks.DATAFEED_TASK_NAME, + new StartDatafeedAction.DatafeedParams("df2", 0L), + new PersistentTasksCustomMetaData.Assignment("node-2", "test assignment")); + + assertThat(MlTasks.startedDatafeedIds(tasksBuilder.build()), containsInAnyOrder("df1", "df2")); + } + + public void testStartedDatafeedIds_GivenNull() { + assertThat(MlTasks.startedDatafeedIds(null), empty()); + } + public void testTaskExistsForJob() { PersistentTasksCustomMetaData.Builder tasksBuilder = PersistentTasksCustomMetaData.builder(); assertFalse(MlTasks.taskExistsForJob("job-1", tasksBuilder.build())); diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java index cd25fdd060588..b33b0e472ea7f 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java @@ -443,7 +443,7 @@ public Collection createComponents(Client client, ClusterService cluster jobDataCountsPersister, datafeedManager, auditor, - new MlAssignmentNotifier(auditor, clusterService), + new MlAssignmentNotifier(auditor, threadPool, client, clusterService), memoryTracker ); } @@ -458,7 +458,7 @@ public List> getPersistentTasksExecutor(ClusterServic return Arrays.asList( new TransportOpenJobAction.OpenJobPersistentTasksExecutor(settings, clusterService, autodetectProcessManager.get(), - memoryTracker.get()), + memoryTracker.get(), client), new TransportStartDatafeedAction.StartDatafeedPersistentTasksExecutor(settings, datafeedManager.get()) ); } diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MlAssignmentNotifier.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MlAssignmentNotifier.java index 67486ba75792a..9479d1281d6de 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MlAssignmentNotifier.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MlAssignmentNotifier.java @@ -7,7 +7,11 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; +import org.elasticsearch.Version; +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.client.Client; import org.elasticsearch.cluster.ClusterChangedEvent; +import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.ClusterStateListener; import org.elasticsearch.cluster.LocalNodeMasterListener; import org.elasticsearch.cluster.node.DiscoveryNode; @@ -29,12 +33,23 @@ public class MlAssignmentNotifier implements ClusterStateListener, LocalNodeMast private final Auditor auditor; private final ClusterService clusterService; - + private final MlConfigMigrator mlConfigMigrator; + private final ThreadPool threadPool; private final AtomicBoolean enabled = new AtomicBoolean(false); - MlAssignmentNotifier(Auditor auditor, ClusterService clusterService) { + MlAssignmentNotifier(Auditor auditor, ThreadPool threadPool, Client client, ClusterService clusterService) { + this.auditor = auditor; + this.clusterService = clusterService; + this.mlConfigMigrator = new MlConfigMigrator(client, clusterService); + this.threadPool = threadPool; + clusterService.addLocalNodeMasterListener(this); + } + + MlAssignmentNotifier(Auditor auditor, ThreadPool threadPool, MlConfigMigrator mlConfigMigrator, ClusterService clusterService) { this.auditor = auditor; this.clusterService = clusterService; + this.mlConfigMigrator = mlConfigMigrator; + this.threadPool = threadPool; clusterService.addLocalNodeMasterListener(this); } @@ -71,6 +86,25 @@ public void clusterChanged(ClusterChangedEvent event) { return; } + Version minNodeVersion = event.state().nodes().getMinNodeVersion(); + if (minNodeVersion.onOrAfter(Version.V_6_6_0)) { + // ok to migrate + mlConfigMigrator.migrateConfigsWithoutTasks(event.state(), ActionListener.wrap( + response -> threadPool.executor(executorName()).execute(() -> auditChangesToMlTasks(current, previous, event.state())), + e -> { + logger.error("error migrating ml configurations", e); + threadPool.executor(executorName()).execute(() -> auditChangesToMlTasks(current, previous, event.state())); + } + )); + } else { + threadPool.executor(executorName()).execute(() -> auditChangesToMlTasks(current, previous, event.state())); + } + + } + + private void auditChangesToMlTasks(PersistentTasksCustomMetaData current, PersistentTasksCustomMetaData previous, + ClusterState state) { + for (PersistentTask currentTask : current.tasks()) { Assignment currentAssignment = currentTask.getAssignment(); PersistentTask previousTask = previous != null ? previous.getTask(currentTask.getId()) : null; @@ -83,7 +117,7 @@ public void clusterChanged(ClusterChangedEvent event) { if (currentAssignment.getExecutorNode() == null) { auditor.warning(jobId, "No node found to open job. Reasons [" + currentAssignment.getExplanation() + "]"); } else { - DiscoveryNode node = event.state().nodes().get(currentAssignment.getExecutorNode()); + DiscoveryNode node = state.nodes().get(currentAssignment.getExecutorNode()); auditor.info(jobId, "Opening job on node [" + node.toString() + "]"); } } else if (MlTasks.DATAFEED_TASK_NAME.equals(currentTask.getTaskName())) { @@ -97,7 +131,7 @@ public void clusterChanged(ClusterChangedEvent event) { auditor.warning(jobId, msg); } } else { - DiscoveryNode node = event.state().nodes().get(currentAssignment.getExecutorNode()); + DiscoveryNode node = state.nodes().get(currentAssignment.getExecutorNode()); if (jobId != null) { auditor.info(jobId, "Starting datafeed [" + datafeedParams.getDatafeedId() + "] on node [" + node + "]"); } diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MlConfigMigrator.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MlConfigMigrator.java new file mode 100644 index 0000000000000..22dc43b9326fb --- /dev/null +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MlConfigMigrator.java @@ -0,0 +1,384 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.ml; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.elasticsearch.Version; +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.bulk.BulkItemResponse; +import org.elasticsearch.action.bulk.BulkRequestBuilder; +import org.elasticsearch.action.bulk.BulkResponse; +import org.elasticsearch.action.index.IndexRequest; +import org.elasticsearch.action.support.WriteRequest; +import org.elasticsearch.client.Client; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.ClusterStateUpdateTask; +import org.elasticsearch.cluster.metadata.MetaData; +import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.xcontent.ToXContent; +import org.elasticsearch.common.xcontent.ToXContentObject; +import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.common.xcontent.XContentFactory; +import org.elasticsearch.persistent.PersistentTasksCustomMetaData; +import org.elasticsearch.xpack.core.ml.MlMetadata; +import org.elasticsearch.xpack.core.ml.MlTasks; +import org.elasticsearch.xpack.core.ml.datafeed.DatafeedConfig; +import org.elasticsearch.xpack.core.ml.job.config.Job; +import org.elasticsearch.xpack.core.ml.job.persistence.AnomalyDetectorsIndex; +import org.elasticsearch.xpack.core.ml.job.persistence.ElasticsearchMappings; +import org.elasticsearch.xpack.ml.datafeed.persistence.DatafeedConfigProvider; +import org.elasticsearch.xpack.ml.job.persistence.JobConfigProvider; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; +import java.util.stream.Collectors; + +import static org.elasticsearch.xpack.core.ClientHelper.ML_ORIGIN; +import static org.elasticsearch.xpack.core.ClientHelper.executeAsyncWithOrigin; + +/** + * Migrates job and datafeed configurations from the clusterstate to + * index documents. + * + * There are 3 steps to the migration process + * 1. Read config from the clusterstate + * - If a job or datafeed is added after this call it will be added to the index + * - If deleted then it's possible the config will be copied before it is deleted. + * Mitigate against this by filtering out jobs marked as deleting + * 2. Copy the config to the index + * - The index operation could fail, don't delete from clusterstate in this case + * 3. Remove config from the clusterstate + * - Before this happens config is duplicated in index and clusterstate, all ops + * must prefer to use the index config at this stage + * - If the clusterstate update fails then the config will remain duplicated + * and the migration process should try again + * + * If there was an error in step 3 and the config is in both the clusterstate and + * index then when the migrator retries it must not overwrite an existing job config + * document as once the index document is present all update operations will function + * on that rather than the clusterstate + */ +public class MlConfigMigrator { + + private static final Logger logger = LogManager.getLogger(MlConfigMigrator.class); + + public static final String MIGRATED_FROM_VERSION = "migrated from version"; + + private final Client client; + private final ClusterService clusterService; + + private final AtomicBoolean migrationInProgress; + + public MlConfigMigrator(Client client, ClusterService clusterService) { + this.client = client; + this.clusterService = clusterService; + this.migrationInProgress = new AtomicBoolean(false); + } + + /** + * Migrate ml job and datafeed configurations from the clusterstate + * to index documents. + * + * Configs to be migrated are read from the cluster state then bulk + * indexed into .ml-config. Those successfully indexed are then removed + * from the clusterstate. + * + * Migrated jobs have the job version set to v6.6.0 and the custom settings + * map has an entry added recording the fact the job was migrated and its + * original version e.g. + * "migrated from version" : v6.1.0 + * + * + * @param clusterState The current clusterstate + * @param listener The success listener + */ + public void migrateConfigsWithoutTasks(ClusterState clusterState, ActionListener listener) { + + if (migrationInProgress.compareAndSet(false, true) == false) { + listener.onResponse(Boolean.FALSE); + return; + } + + Collection datafeedsToMigrate = stoppedDatafeedConfigs(clusterState); + List jobsToMigrate = nonDeletingJobs(closedJobConfigs(clusterState)).stream() + .map(MlConfigMigrator::updateJobForMigration) + .collect(Collectors.toList()); + + ActionListener unMarkMigrationInProgress = ActionListener.wrap( + response -> { + migrationInProgress.set(false); + listener.onResponse(response); + }, + e -> { + migrationInProgress.set(false); + listener.onFailure(e); + } + ); + + if (datafeedsToMigrate.isEmpty() && jobsToMigrate.isEmpty()) { + unMarkMigrationInProgress.onResponse(Boolean.FALSE); + return; + } + + writeConfigToIndex(datafeedsToMigrate, jobsToMigrate, ActionListener.wrap( + failedDocumentIds -> { + List successfulJobWrites = filterFailedJobConfigWrites(failedDocumentIds, jobsToMigrate); + List successfulDatafeedWrites = + filterFailedDatafeedConfigWrites(failedDocumentIds, datafeedsToMigrate); + removeFromClusterState(successfulJobWrites, successfulDatafeedWrites, unMarkMigrationInProgress); + }, + unMarkMigrationInProgress::onFailure + )); + } + + // Exposed for testing + public void writeConfigToIndex(Collection datafeedsToMigrate, + Collection jobsToMigrate, + ActionListener> listener) { + + BulkRequestBuilder bulkRequestBuilder = client.prepareBulk(); + addJobIndexRequests(jobsToMigrate, bulkRequestBuilder); + addDatafeedIndexRequests(datafeedsToMigrate, bulkRequestBuilder); + bulkRequestBuilder.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE); + + executeAsyncWithOrigin(client.threadPool().getThreadContext(), ML_ORIGIN, bulkRequestBuilder.request(), + ActionListener.wrap( + bulkResponse -> { + Set failedDocumentIds = documentsNotWritten(bulkResponse); + listener.onResponse(failedDocumentIds); + }, + listener::onFailure), + client::bulk + ); + } + + private void removeFromClusterState(List jobsToRemoveIds, List datafeedsToRemoveIds, + ActionListener listener) { + if (jobsToRemoveIds.isEmpty() && datafeedsToRemoveIds.isEmpty()) { + listener.onResponse(Boolean.FALSE); + return; + } + + AtomicReference removedConfigs = new AtomicReference<>(); + + clusterService.submitStateUpdateTask("remove-migrated-ml-configs", new ClusterStateUpdateTask() { + @Override + public ClusterState execute(ClusterState currentState) { + RemovalResult removed = removeJobsAndDatafeeds(jobsToRemoveIds, datafeedsToRemoveIds, + MlMetadata.getMlMetadata(currentState)); + removedConfigs.set(removed); + ClusterState.Builder newState = ClusterState.builder(currentState); + newState.metaData(MetaData.builder(currentState.getMetaData()) + .putCustom(MlMetadata.TYPE, removed.mlMetadata) + .build()); + return newState.build(); + } + + @Override + public void onFailure(String source, Exception e) { + listener.onFailure(e); + } + + @Override + public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) { + if (removedConfigs.get() != null) { + if (removedConfigs.get().removedJobIds.isEmpty() == false) { + logger.info("ml job configurations migrated: {}", removedConfigs.get().removedJobIds); + } + if (removedConfigs.get().removedDatafeedIds.isEmpty() == false) { + logger.info("ml datafeed configurations migrated: {}", removedConfigs.get().removedDatafeedIds); + } + } + listener.onResponse(Boolean.TRUE); + } + }); + } + + static class RemovalResult { + MlMetadata mlMetadata; + List removedJobIds; + List removedDatafeedIds; + + RemovalResult(MlMetadata mlMetadata, List removedJobIds, List removedDatafeedIds) { + this.mlMetadata = mlMetadata; + this.removedJobIds = removedJobIds; + this.removedDatafeedIds = removedDatafeedIds; + } + } + + /** + * Remove the datafeeds and jobs listed in the parameters from + * mlMetadata if they exist. An account of removed jobs and datafeeds + * is returned in the result structure alongside a new MlMetadata + * with the config removed. + * + * @param jobsToRemove Jobs + * @param datafeedsToRemove Datafeeds + * @param mlMetadata MlMetadata + * @return Structure tracking which jobs and datafeeds were actually removed + * and the new MlMetadata + */ + static RemovalResult removeJobsAndDatafeeds(List jobsToRemove, List datafeedsToRemove, MlMetadata mlMetadata) { + Map currentJobs = new HashMap<>(mlMetadata.getJobs()); + List removedJobIds = new ArrayList<>(); + for (String jobId : jobsToRemove) { + if (currentJobs.remove(jobId) != null) { + removedJobIds.add(jobId); + } + } + + Map currentDatafeeds = new HashMap<>(mlMetadata.getDatafeeds()); + List removedDatafeedIds = new ArrayList<>(); + for (String datafeedId : datafeedsToRemove) { + if (currentDatafeeds.remove(datafeedId) != null) { + removedDatafeedIds.add(datafeedId); + } + } + + MlMetadata.Builder builder = new MlMetadata.Builder(); + builder.setLastMemoryRefreshVersion(mlMetadata.getLastMemoryRefreshVersion()) + .putJobs(currentJobs.values()) + .putDatafeeds(currentDatafeeds.values()); + + return new RemovalResult(builder.build(), removedJobIds, removedDatafeedIds); + } + + private void addJobIndexRequests(Collection jobs, BulkRequestBuilder bulkRequestBuilder) { + ToXContent.Params params = new ToXContent.MapParams(JobConfigProvider.TO_XCONTENT_PARAMS); + for (Job job : jobs) { + bulkRequestBuilder.add(indexRequest(job, Job.documentId(job.getId()), params)); + } + } + + private void addDatafeedIndexRequests(Collection datafeedConfigs, BulkRequestBuilder bulkRequestBuilder) { + ToXContent.Params params = new ToXContent.MapParams(DatafeedConfigProvider.TO_XCONTENT_PARAMS); + for (DatafeedConfig datafeedConfig : datafeedConfigs) { + bulkRequestBuilder.add(indexRequest(datafeedConfig, DatafeedConfig.documentId(datafeedConfig.getId()), params)); + } + } + + private IndexRequest indexRequest(ToXContentObject source, String documentId, ToXContent.Params params) { + IndexRequest indexRequest = new IndexRequest(AnomalyDetectorsIndex.configIndexName(), ElasticsearchMappings.DOC_TYPE, documentId); + + try (XContentBuilder builder = XContentFactory.jsonBuilder()) { + indexRequest.source(source.toXContent(builder, params)); + } catch (IOException e) { + throw new IllegalStateException("failed to serialise object [" + documentId + "]", e); + } + return indexRequest; + } + + public static Job updateJobForMigration(Job job) { + Job.Builder builder = new Job.Builder(job); + Map custom = job.getCustomSettings() == null ? new HashMap<>() : new HashMap<>(job.getCustomSettings()); + custom.put(MIGRATED_FROM_VERSION, job.getJobVersion()); + builder.setCustomSettings(custom); + // Pre v5.5 (ml beta) jobs do not have a version. + // These jobs cannot be opened, we rely on the missing version + // to indicate this. + // See TransportOpenJobAction.validate() + if (job.getJobVersion() != null) { + builder.setJobVersion(Version.CURRENT); + } + return builder.build(); + } + + /** + * Filter jobs marked as deleting from the list of jobs + * are not marked as deleting. + * + * @param jobs The jobs to filter + * @return Jobs not marked as deleting + */ + public static List nonDeletingJobs(List jobs) { + return jobs.stream() + .filter(job -> job.isDeleting() == false) + .collect(Collectors.toList()); + } + + /** + * Find the configurations for all closed jobs in the cluster state. + * Closed jobs are those that do not have an associated persistent task. + * + * @param clusterState The cluster state + * @return The closed job configurations + */ + public static List closedJobConfigs(ClusterState clusterState) { + PersistentTasksCustomMetaData persistentTasks = clusterState.metaData().custom(PersistentTasksCustomMetaData.TYPE); + Set openJobIds = MlTasks.openJobIds(persistentTasks); + + MlMetadata mlMetadata = MlMetadata.getMlMetadata(clusterState); + return mlMetadata.getJobs().values().stream() + .filter(job -> openJobIds.contains(job.getId()) == false) + .collect(Collectors.toList()); + } + + /** + * Find the configurations for stopped datafeeds in the cluster state. + * Stopped datafeeds are those that do not have an associated persistent task. + * + * @param clusterState The cluster state + * @return The closed job configurations + */ + public static List stoppedDatafeedConfigs(ClusterState clusterState) { + PersistentTasksCustomMetaData persistentTasks = clusterState.metaData().custom(PersistentTasksCustomMetaData.TYPE); + Set startedDatafeedIds = MlTasks.startedDatafeedIds(persistentTasks); + + MlMetadata mlMetadata = MlMetadata.getMlMetadata(clusterState); + return mlMetadata.getDatafeeds().values().stream() + .filter(datafeedConfig-> startedDatafeedIds.contains(datafeedConfig.getId()) == false) + .collect(Collectors.toList()); + } + + /** + * Check for failures in the bulk response and return the + * Ids of any documents not written to the index + * + * If the index operation failed because the document already + * exists this is not considered an error. + * + * @param response BulkResponse + * @return The set of document Ids not written by the bulk request + */ + static Set documentsNotWritten(BulkResponse response) { + Set failedDocumentIds = new HashSet<>(); + + for (BulkItemResponse itemResponse : response.getItems()) { + if (itemResponse.isFailed()) { + BulkItemResponse.Failure failure = itemResponse.getFailure(); + failedDocumentIds.add(itemResponse.getFailure().getId()); + logger.info("failed to index ml configuration [" + itemResponse.getFailure().getId() + "], " + + itemResponse.getFailure().getMessage()); + } else { + logger.info("ml configuration [" + itemResponse.getId() + "] indexed"); + } + } + return failedDocumentIds; + } + + static List filterFailedJobConfigWrites(Set failedDocumentIds, List jobs) { + return jobs.stream() + .map(Job::getId) + .filter(id -> failedDocumentIds.contains(Job.documentId(id)) == false) + .collect(Collectors.toList()); + } + + static List filterFailedDatafeedConfigWrites(Set failedDocumentIds, Collection datafeeds) { + return datafeeds.stream() + .map(DatafeedConfig::getId) + .filter(id -> failedDocumentIds.contains(DatafeedConfig.documentId(id)) == false) + .collect(Collectors.toList()); + } +} diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MlInitializationService.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MlInitializationService.java index 016fcd5e9287b..5a635aeed8645 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MlInitializationService.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MlInitializationService.java @@ -6,15 +6,12 @@ package org.elasticsearch.xpack.ml; import org.elasticsearch.client.Client; -import org.elasticsearch.cluster.ClusterChangedEvent; -import org.elasticsearch.cluster.ClusterStateListener; +import org.elasticsearch.cluster.LocalNodeMasterListener; import org.elasticsearch.cluster.service.ClusterService; -import org.elasticsearch.common.component.AbstractComponent; import org.elasticsearch.common.component.LifecycleListener; -import org.elasticsearch.gateway.GatewayService; import org.elasticsearch.threadpool.ThreadPool; -class MlInitializationService extends AbstractComponent implements ClusterStateListener { +class MlInitializationService implements LocalNodeMasterListener { private final ThreadPool threadPool; private final ClusterService clusterService; @@ -26,21 +23,21 @@ class MlInitializationService extends AbstractComponent implements ClusterStateL this.threadPool = threadPool; this.clusterService = clusterService; this.client = client; - clusterService.addListener(this); } @Override - public void clusterChanged(ClusterChangedEvent event) { - if (event.state().blocks().hasGlobalBlock(GatewayService.STATE_NOT_RECOVERED_BLOCK)) { - // Wait until the gateway has recovered from disk. - return; - } + public void onMaster() { + installDailyMaintenanceService(); + } - if (event.localNodeMaster()) { - installDailyMaintenanceService(); - } else { - uninstallDailyMaintenanceService(); - } + @Override + public void offMaster() { + uninstallDailyMaintenanceService(); + } + + @Override + public String executorName() { + return ThreadPool.Names.GENERIC; } private void installDailyMaintenanceService() { diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportCloseJobAction.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportCloseJobAction.java index d6c93d713e554..63b8f90a114d0 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportCloseJobAction.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportCloseJobAction.java @@ -31,7 +31,6 @@ import org.elasticsearch.xpack.core.ml.MlMetadata; import org.elasticsearch.xpack.core.ml.MlTasks; import org.elasticsearch.xpack.core.ml.action.CloseJobAction; -import org.elasticsearch.xpack.core.ml.action.FinalizeJobExecutionAction; import org.elasticsearch.xpack.core.ml.datafeed.DatafeedConfig; import org.elasticsearch.xpack.core.ml.datafeed.DatafeedState; import org.elasticsearch.xpack.core.ml.job.config.JobState; @@ -53,9 +52,6 @@ import java.util.concurrent.atomic.AtomicInteger; import java.util.stream.Collectors; -import static org.elasticsearch.xpack.core.ClientHelper.ML_ORIGIN; -import static org.elasticsearch.xpack.core.ClientHelper.executeAsyncWithOrigin; - public class TransportCloseJobAction extends TransportTasksAction { @@ -443,10 +439,7 @@ void waitForJobClosed(CloseJobAction.Request request, WaitForCloseRequest waitFo }, request.getCloseTimeout(), new ActionListener() { @Override public void onResponse(Boolean result) { - FinalizeJobExecutionAction.Request finalizeRequest = new FinalizeJobExecutionAction.Request( - waitForCloseRequest.jobsToFinalize.toArray(new String[0])); - executeAsyncWithOrigin(client, ML_ORIGIN, FinalizeJobExecutionAction.INSTANCE, finalizeRequest, - ActionListener.wrap(r -> listener.onResponse(response), listener::onFailure)); + listener.onResponse(response); } @Override diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportOpenJobAction.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportOpenJobAction.java index c9315a178148d..23c45c4c69c27 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportOpenJobAction.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportOpenJobAction.java @@ -56,6 +56,7 @@ import org.elasticsearch.xpack.core.ml.MlMetaIndex; import org.elasticsearch.xpack.core.ml.MlMetadata; import org.elasticsearch.xpack.core.ml.MlTasks; +import org.elasticsearch.xpack.core.ml.action.FinalizeJobExecutionAction; import org.elasticsearch.xpack.core.ml.action.OpenJobAction; import org.elasticsearch.xpack.core.ml.action.PutJobAction; import org.elasticsearch.xpack.core.ml.action.UpdateJobAction; @@ -804,6 +805,7 @@ public static class OpenJobPersistentTasksExecutor extends PersistentTasksExecut private final AutodetectProcessManager autodetectProcessManager; private final MlMemoryTracker memoryTracker; + private final Client client; /** * The maximum number of open jobs can be different on each node. However, nodes on older versions @@ -817,10 +819,12 @@ public static class OpenJobPersistentTasksExecutor extends PersistentTasksExecut private volatile int maxLazyMLNodes; public OpenJobPersistentTasksExecutor(Settings settings, ClusterService clusterService, - AutodetectProcessManager autodetectProcessManager, MlMemoryTracker memoryTracker) { + AutodetectProcessManager autodetectProcessManager, MlMemoryTracker memoryTracker, + Client client) { super(MlTasks.JOB_TASK_NAME, MachineLearning.UTILITY_THREAD_POOL_NAME); this.autodetectProcessManager = autodetectProcessManager; this.memoryTracker = memoryTracker; + this.client = client; this.fallbackMaxNumberOfOpenJobs = AutodetectProcessManager.MAX_OPEN_JOBS_PER_NODE.get(settings); this.maxConcurrentJobAllocations = MachineLearning.CONCURRENT_JOB_ALLOCATIONS.get(settings); this.maxMachineMemoryPercent = MachineLearning.MAX_MACHINE_MEMORY_PERCENT.get(settings); @@ -890,9 +894,15 @@ protected void nodeOperation(AllocatedPersistentTask task, OpenJobAction.JobPara return; } + String jobId = jobTask.getJobId(); autodetectProcessManager.openJob(jobTask, e2 -> { if (e2 == null) { - task.markAsCompleted(); + FinalizeJobExecutionAction.Request finalizeRequest = new FinalizeJobExecutionAction.Request(new String[]{jobId}); + executeAsyncWithOrigin(client, ML_ORIGIN, FinalizeJobExecutionAction.INSTANCE, finalizeRequest, + ActionListener.wrap( + response -> task.markAsCompleted(), + e -> logger.error("error finalizing job [" + jobId + "]", e) + )); } else { task.markAsFailed(e2); } diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportPutDatafeedAction.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportPutDatafeedAction.java index 62eb4e5039568..c6949ef0ebbf3 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportPutDatafeedAction.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportPutDatafeedAction.java @@ -57,7 +57,6 @@ public class TransportPutDatafeedAction extends TransportMasterNodeAction TO_XCONTENT_PARAMS = new HashMap<>(); + public static final Map TO_XCONTENT_PARAMS; static { - TO_XCONTENT_PARAMS.put(ToXContentParams.FOR_INTERNAL_STORAGE, "true"); - TO_XCONTENT_PARAMS.put(ToXContentParams.INCLUDE_TYPE, "true"); + Map modifiable = new HashMap<>(); + modifiable.put(ToXContentParams.FOR_INTERNAL_STORAGE, "true"); + modifiable.put(ToXContentParams.INCLUDE_TYPE, "true"); + TO_XCONTENT_PARAMS = Collections.unmodifiableMap(modifiable); } public DatafeedConfigProvider(Client client, NamedXContentRegistry xContentRegistry) { diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobConfigProvider.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobConfigProvider.java index e48bdfcd1f26a..0bb32af9b2e1d 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobConfigProvider.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobConfigProvider.java @@ -67,14 +67,17 @@ import org.elasticsearch.xpack.core.ml.job.persistence.AnomalyDetectorsIndex; import org.elasticsearch.xpack.core.ml.job.persistence.ElasticsearchMappings; import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper; +import org.elasticsearch.xpack.core.ml.utils.ToXContentParams; import java.io.IOException; import java.io.InputStream; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; +import java.util.HashMap; import java.util.HashSet; import java.util.List; +import java.util.Map; import java.util.Set; import java.util.SortedSet; import java.util.TreeSet; @@ -91,6 +94,13 @@ public class JobConfigProvider { private static final Logger logger = LogManager.getLogger(JobConfigProvider.class); + public static final Map TO_XCONTENT_PARAMS; + static { + Map modifiable = new HashMap<>(); + modifiable.put(ToXContentParams.FOR_INTERNAL_STORAGE, "true"); + TO_XCONTENT_PARAMS = Collections.unmodifiableMap(modifiable); + } + private final Client client; public JobConfigProvider(Client client) { @@ -107,7 +117,7 @@ public JobConfigProvider(Client client) { */ public void putJob(Job job, ActionListener listener) { try (XContentBuilder builder = XContentFactory.jsonBuilder()) { - XContentBuilder source = job.toXContent(builder, ToXContent.EMPTY_PARAMS); + XContentBuilder source = job.toXContent(builder, new ToXContent.MapParams(TO_XCONTENT_PARAMS)); IndexRequest indexRequest = client.prepareIndex(AnomalyDetectorsIndex.configIndexName(), ElasticsearchMappings.DOC_TYPE, Job.documentId(job.getId())) .setSource(source) diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManager.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManager.java index c7d21e3686642..8635d9e3153bf 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManager.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManager.java @@ -402,7 +402,7 @@ public void onFailure(Exception e) { } } - public void openJob(JobTask jobTask, Consumer handler) { + public void openJob(JobTask jobTask, Consumer closeHandler) { String jobId = jobTask.getJobId(); logger.info("Opening job [{}]", jobId); @@ -410,7 +410,7 @@ public void openJob(JobTask jobTask, Consumer handler) { // NORELEASE JIndex. Should not be doing this work on the network thread job -> { if (job.getJobVersion() == null) { - handler.accept(ExceptionsHelper.badRequestException("Cannot open job [" + jobId + closeHandler.accept(ExceptionsHelper.badRequestException("Cannot open job [" + jobId + "] because jobs created prior to version 5.5 are not supported")); return; } @@ -422,7 +422,7 @@ public void openJob(JobTask jobTask, Consumer handler) { threadPool.executor(MachineLearning.UTILITY_THREAD_POOL_NAME).execute(new AbstractRunnable() { @Override public void onFailure(Exception e) { - handler.accept(e); + closeHandler.accept(e); } @Override @@ -439,7 +439,7 @@ protected void doRun() { } try { - createProcessAndSetRunning(processContext, job, params, handler); + createProcessAndSetRunning(processContext, job, params, closeHandler); processContext.getAutodetectCommunicator().init(params.modelSnapshot()); setJobState(jobTask, JobState.OPENED); } catch (Exception e1) { @@ -452,17 +452,17 @@ protected void doRun() { .kill(); processByAllocation.remove(jobTask.getAllocationId()); } finally { - setJobState(jobTask, JobState.FAILED, e2 -> handler.accept(e1)); + setJobState(jobTask, JobState.FAILED, e2 -> closeHandler.accept(e1)); } } } }); }, e1 -> { logger.warn("Failed to gather information required to open job [" + jobId + "]", e1); - setJobState(jobTask, JobState.FAILED, e2 -> handler.accept(e1)); + setJobState(jobTask, JobState.FAILED, e2 -> closeHandler.accept(e1)); }); }, - handler + closeHandler )); } diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/MlAssignmentNotifierTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/MlAssignmentNotifierTests.java index 90a1d45f9e15e..f6754ff7d039d 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/MlAssignmentNotifierTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/MlAssignmentNotifierTests.java @@ -6,6 +6,7 @@ package org.elasticsearch.xpack.ml; import org.elasticsearch.Version; +import org.elasticsearch.action.ActionListener; import org.elasticsearch.cluster.ClusterChangedEvent; import org.elasticsearch.cluster.ClusterName; import org.elasticsearch.cluster.ClusterState; @@ -14,27 +15,60 @@ import org.elasticsearch.cluster.node.DiscoveryNodes; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.transport.TransportAddress; -import org.elasticsearch.test.ESTestCase; import org.elasticsearch.persistent.PersistentTasksCustomMetaData; +import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.xpack.ml.notifications.Auditor; +import org.junit.Before; import java.net.InetAddress; import java.util.Collections; +import java.util.concurrent.ExecutorService; import static org.elasticsearch.xpack.ml.action.TransportOpenJobActionTests.addJobTask; import static org.mockito.Matchers.any; +import static org.mockito.Matchers.anyString; import static org.mockito.Matchers.eq; +import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.verifyNoMoreInteractions; +import static org.mockito.Mockito.when; public class MlAssignmentNotifierTests extends ESTestCase { + private Auditor auditor; + private ClusterService clusterService; + private ThreadPool threadPool; + private MlConfigMigrator configMigrator; + + @Before + @SuppressWarnings("unchecked") + private void setupMocks() { + auditor = mock(Auditor.class); + clusterService = mock(ClusterService.class); + threadPool = mock(ThreadPool.class); + configMigrator = mock(MlConfigMigrator.class); + threadPool = mock(ThreadPool.class); + + ExecutorService executorService = mock(ExecutorService.class); + org.elasticsearch.mock.orig.Mockito.doAnswer(invocation -> { + ((Runnable) invocation.getArguments()[0]).run(); + return null; + }).when(executorService).execute(any(Runnable.class)); + when(threadPool.executor(anyString())).thenReturn(executorService); + + doAnswer(invocation -> { + ActionListener listener = (ActionListener) invocation.getArguments()[1]; + listener.onResponse(Boolean.TRUE); + return null; + }).when(configMigrator).migrateConfigsWithoutTasks(any(ClusterState.class), any(ActionListener.class)); + } + public void testClusterChanged_info() { - Auditor auditor = mock(Auditor.class); - ClusterService clusterService = mock(ClusterService.class); - MlAssignmentNotifier notifier = new MlAssignmentNotifier(auditor, clusterService); + MlAssignmentNotifier notifier = new MlAssignmentNotifier(auditor, threadPool, configMigrator, clusterService); notifier.onMaster(); DiscoveryNode node = @@ -53,6 +87,7 @@ public void testClusterChanged_info() { .build(); notifier.clusterChanged(new ClusterChangedEvent("_test", state, previous)); verify(auditor, times(1)).info(eq("job_id"), any()); + verify(configMigrator, times(1)).migrateConfigsWithoutTasks(eq(state), any()); notifier.offMaster(); notifier.clusterChanged(new ClusterChangedEvent("_test", state, previous)); @@ -60,9 +95,7 @@ public void testClusterChanged_info() { } public void testClusterChanged_warning() { - Auditor auditor = mock(Auditor.class); - ClusterService clusterService = mock(ClusterService.class); - MlAssignmentNotifier notifier = new MlAssignmentNotifier(auditor, clusterService); + MlAssignmentNotifier notifier = new MlAssignmentNotifier(auditor, threadPool, configMigrator, clusterService); notifier.onMaster(); ClusterState previous = ClusterState.builder(new ClusterName("_name")) @@ -78,10 +111,66 @@ public void testClusterChanged_warning() { .build(); notifier.clusterChanged(new ClusterChangedEvent("_test", state, previous)); verify(auditor, times(1)).warning(eq("job_id"), any()); + verify(configMigrator, times(1)).migrateConfigsWithoutTasks(eq(state), any()); notifier.offMaster(); notifier.clusterChanged(new ClusterChangedEvent("_test", state, previous)); verifyNoMoreInteractions(auditor); } + public void testClusterChanged_noPersistentTaskChanges() { + MlAssignmentNotifier notifier = new MlAssignmentNotifier(auditor, threadPool, configMigrator, clusterService); + notifier.onMaster(); + + PersistentTasksCustomMetaData.Builder tasksBuilder = PersistentTasksCustomMetaData.builder(); + addJobTask("job_id", null, null, tasksBuilder); + MetaData metaData = MetaData.builder().putCustom(PersistentTasksCustomMetaData.TYPE, tasksBuilder.build()).build(); + ClusterState previous = ClusterState.builder(new ClusterName("_name")) + .metaData(metaData) + .build(); + + ClusterState current = ClusterState.builder(new ClusterName("_name")) + .metaData(metaData) + .build(); + + notifier.clusterChanged(new ClusterChangedEvent("_test", current, previous)); + verify(configMigrator, never()).migrateConfigsWithoutTasks(any(), any()); + + notifier.offMaster(); + verify(configMigrator, never()).migrateConfigsWithoutTasks(any(), any()); + } + + public void testMigrateNotTriggered_GivenPre66Nodes() { + MlAssignmentNotifier notifier = new MlAssignmentNotifier(auditor, threadPool, configMigrator, clusterService); + notifier.onMaster(); + + ClusterState previous = ClusterState.builder(new ClusterName("_name")) + .build(); + + PersistentTasksCustomMetaData.Builder tasksBuilder = PersistentTasksCustomMetaData.builder(); + addJobTask("job_id", null, null, tasksBuilder); + MetaData metaData = MetaData.builder().putCustom(PersistentTasksCustomMetaData.TYPE, tasksBuilder.build()).build(); + + // mixed 6.5 and 6.6 nodes + ClusterState current = ClusterState.builder(new ClusterName("_name")) + .nodes(DiscoveryNodes.builder() + .add(new DiscoveryNode("node_id1", new TransportAddress(InetAddress.getLoopbackAddress(), 9300), Version.V_6_5_0)) + .add(new DiscoveryNode("node_id2", new TransportAddress(InetAddress.getLoopbackAddress(), 9301), Version.V_6_6_0))) + .metaData(metaData) + .build(); + + notifier.clusterChanged(new ClusterChangedEvent("_test", current, previous)); + verify(configMigrator, never()).migrateConfigsWithoutTasks(any(), any()); + + current = ClusterState.builder(new ClusterName("_name")) + .nodes(DiscoveryNodes.builder() + .add(new DiscoveryNode("node_id1", new TransportAddress(InetAddress.getLoopbackAddress(), 9300), Version.V_6_6_0)) + .add(new DiscoveryNode("node_id2", new TransportAddress(InetAddress.getLoopbackAddress(), 9301), Version.V_6_6_0))) + .metaData(metaData) + .build(); + + // all 6.6 nodes + notifier.clusterChanged(new ClusterChangedEvent("_test", current, previous)); + verify(configMigrator, times(1)).migrateConfigsWithoutTasks(any(), any()); + } } diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/MlConfigMigratorTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/MlConfigMigratorTests.java new file mode 100644 index 0000000000000..faaf4425dfb02 --- /dev/null +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/MlConfigMigratorTests.java @@ -0,0 +1,222 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.ml; + +import org.elasticsearch.Version; +import org.elasticsearch.action.bulk.BulkItemResponse; +import org.elasticsearch.action.bulk.BulkResponse; +import org.elasticsearch.cluster.ClusterName; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.metadata.MetaData; +import org.elasticsearch.persistent.PersistentTasksCustomMetaData; +import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.xpack.core.ml.MlMetadata; +import org.elasticsearch.xpack.core.ml.MlTasks; +import org.elasticsearch.xpack.core.ml.action.OpenJobAction; +import org.elasticsearch.xpack.core.ml.action.StartDatafeedAction; +import org.elasticsearch.xpack.core.ml.datafeed.DatafeedConfig; +import org.elasticsearch.xpack.core.ml.job.config.Job; +import org.elasticsearch.xpack.core.ml.job.config.JobTests; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.Set; + +import static org.hamcrest.Matchers.contains; +import static org.hamcrest.Matchers.containsInAnyOrder; +import static org.hamcrest.Matchers.empty; +import static org.hamcrest.Matchers.hasSize; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +public class MlConfigMigratorTests extends ESTestCase { + + public void testNonDeletingJobs() { + Job job1 = JobTests.buildJobBuilder("openjob1").build(); + Job job2 = JobTests.buildJobBuilder("openjob2").build(); + Job deletingJob = JobTests.buildJobBuilder("deleting-job").setDeleting(true).build(); + + assertThat(MlConfigMigrator.nonDeletingJobs(Arrays.asList(job1, job2, deletingJob)), containsInAnyOrder(job1, job2)); + } + + public void testClosedJobConfigs() { + Job openJob1 = JobTests.buildJobBuilder("openjob1").build(); + Job openJob2 = JobTests.buildJobBuilder("openjob2").build(); + + MlMetadata.Builder mlMetadata = new MlMetadata.Builder() + .putJob(openJob1, false) + .putJob(openJob2, false) + .putDatafeed(createCompatibleDatafeed(openJob1.getId()), Collections.emptyMap()); + + ClusterState clusterState = ClusterState.builder(new ClusterName("migratortests")) + .metaData(MetaData.builder() + .putCustom(MlMetadata.TYPE, mlMetadata.build()) + .putCustom(PersistentTasksCustomMetaData.TYPE, PersistentTasksCustomMetaData.builder().build()) + ) + .build(); + + assertThat(MlConfigMigrator.closedJobConfigs(clusterState), containsInAnyOrder(openJob1, openJob2)); + + PersistentTasksCustomMetaData.Builder tasksBuilder = PersistentTasksCustomMetaData.builder(); + tasksBuilder.addTask(MlTasks.jobTaskId("openjob1"), MlTasks.JOB_TASK_NAME, new OpenJobAction.JobParams("foo-1"), + new PersistentTasksCustomMetaData.Assignment("node-1", "test assignment")); + + clusterState = ClusterState.builder(new ClusterName("migratortests")) + .metaData(MetaData.builder() + .putCustom(MlMetadata.TYPE, mlMetadata.build()) + .putCustom(PersistentTasksCustomMetaData.TYPE, tasksBuilder.build()) + ) + .build(); + + assertThat(MlConfigMigrator.closedJobConfigs(clusterState), containsInAnyOrder(openJob2)); + } + + public void testStoppedDatafeedConfigs() { + Job openJob1 = JobTests.buildJobBuilder("openjob1").build(); + Job openJob2 = JobTests.buildJobBuilder("openjob2").build(); + DatafeedConfig datafeedConfig1 = createCompatibleDatafeed(openJob1.getId()); + DatafeedConfig datafeedConfig2 = createCompatibleDatafeed(openJob2.getId()); + MlMetadata.Builder mlMetadata = new MlMetadata.Builder() + .putJob(openJob1, false) + .putJob(openJob2, false) + .putDatafeed(datafeedConfig1, Collections.emptyMap()) + .putDatafeed(datafeedConfig2, Collections.emptyMap()); + + ClusterState clusterState = ClusterState.builder(new ClusterName("migratortests")) + .metaData(MetaData.builder() + .putCustom(MlMetadata.TYPE, mlMetadata.build()) + .putCustom(PersistentTasksCustomMetaData.TYPE, PersistentTasksCustomMetaData.builder().build()) + ) + .build(); + + assertThat(MlConfigMigrator.stoppedDatafeedConfigs(clusterState), containsInAnyOrder(datafeedConfig1, datafeedConfig2)); + + + PersistentTasksCustomMetaData.Builder tasksBuilder = PersistentTasksCustomMetaData.builder(); + tasksBuilder.addTask(MlTasks.jobTaskId("openjob1"), MlTasks.JOB_TASK_NAME, new OpenJobAction.JobParams("foo-1"), + new PersistentTasksCustomMetaData.Assignment("node-1", "test assignment")); + tasksBuilder.addTask(MlTasks.datafeedTaskId(datafeedConfig1.getId()), MlTasks.DATAFEED_TASK_NAME, + new StartDatafeedAction.DatafeedParams(datafeedConfig1.getId(), 0L), + new PersistentTasksCustomMetaData.Assignment("node-2", "test assignment")); + + clusterState = ClusterState.builder(new ClusterName("migratortests")) + .metaData(MetaData.builder() + .putCustom(MlMetadata.TYPE, mlMetadata.build()) + .putCustom(PersistentTasksCustomMetaData.TYPE, tasksBuilder.build()) + ) + .build(); + + assertThat(MlConfigMigrator.stoppedDatafeedConfigs(clusterState), containsInAnyOrder(datafeedConfig2)); + } + + public void testUpdateJobForMigration() { + Job.Builder oldJob = JobTests.buildJobBuilder("pre-migration"); + Version oldVersion = Version.V_6_3_0; + oldJob.setJobVersion(oldVersion); + + Job migratedJob = MlConfigMigrator.updateJobForMigration(oldJob.build()); + assertEquals(Version.CURRENT, migratedJob.getJobVersion()); + assertTrue(migratedJob.getCustomSettings().containsKey(MlConfigMigrator.MIGRATED_FROM_VERSION)); + assertEquals(oldVersion, migratedJob.getCustomSettings().get(MlConfigMigrator.MIGRATED_FROM_VERSION)); + } + + public void testUpdateJobForMigration_GivenV54Job() { + Job.Builder oldJob = JobTests.buildJobBuilder("pre-migration"); + // v5.4 jobs did not have a version and should not have a new one set + oldJob.setJobVersion(null); + + Job migratedJob = MlConfigMigrator.updateJobForMigration(oldJob.build()); + assertNull(migratedJob.getJobVersion()); + assertTrue(migratedJob.getCustomSettings().containsKey(MlConfigMigrator.MIGRATED_FROM_VERSION)); + } + + public void testFilterFailedJobConfigWrites() { + List jobs = new ArrayList<>(); + jobs.add(JobTests.buildJobBuilder("foo").build()); + jobs.add(JobTests.buildJobBuilder("bar").build()); + jobs.add(JobTests.buildJobBuilder("baz").build()); + + assertThat(MlConfigMigrator.filterFailedJobConfigWrites(Collections.emptySet(), jobs), hasSize(3)); + assertThat(MlConfigMigrator.filterFailedJobConfigWrites(Collections.singleton(Job.documentId("bar")), jobs), + contains("foo", "baz")); + } + + public void testFilterFailedDatafeedConfigWrites() { + List datafeeds = new ArrayList<>(); + datafeeds.add(createCompatibleDatafeed("foo")); + datafeeds.add(createCompatibleDatafeed("bar")); + datafeeds.add(createCompatibleDatafeed("baz")); + + assertThat(MlConfigMigrator.filterFailedDatafeedConfigWrites(Collections.emptySet(), datafeeds), hasSize(3)); + assertThat(MlConfigMigrator.filterFailedDatafeedConfigWrites(Collections.singleton(DatafeedConfig.documentId("df-foo")), datafeeds), + contains("df-bar", "df-baz")); + } + + public void testDocumentsNotWritten() { + BulkItemResponse ok = mock(BulkItemResponse.class); + when(ok.isFailed()).thenReturn(false); + + BulkItemResponse failed = mock(BulkItemResponse.class); + when(failed.isFailed()).thenReturn(true); + BulkItemResponse.Failure failure = mock(BulkItemResponse.Failure.class); + when(failure.getId()).thenReturn("failed-doc-id"); + when(failure.getCause()).thenReturn(mock(IllegalStateException.class)); + when(failed.getFailure()).thenReturn(failure); + + BulkResponse bulkResponse = new BulkResponse(new BulkItemResponse[] {ok, failed}, 1L); + Set docsIds = MlConfigMigrator.documentsNotWritten(bulkResponse); + assertThat(docsIds, contains("failed-doc-id")); + } + + public void testRemoveJobsAndDatafeeds_removeAll() { + Job job1 = JobTests.buildJobBuilder("job1").build(); + Job job2 = JobTests.buildJobBuilder("job2").build(); + DatafeedConfig datafeedConfig1 = createCompatibleDatafeed(job1.getId()); + DatafeedConfig datafeedConfig2 = createCompatibleDatafeed(job2.getId()); + MlMetadata.Builder mlMetadata = new MlMetadata.Builder() + .putJob(job1, false) + .putJob(job2, false) + .putDatafeed(datafeedConfig1, Collections.emptyMap()) + .putDatafeed(datafeedConfig2, Collections.emptyMap()); + + MlConfigMigrator.RemovalResult removalResult = MlConfigMigrator.removeJobsAndDatafeeds( + Arrays.asList("job1", "job2"), Arrays.asList("df-job1", "df-job2"), mlMetadata.build()); + + assertThat(removalResult.mlMetadata.getJobs().keySet(), empty()); + assertThat(removalResult.mlMetadata.getDatafeeds().keySet(), empty()); + assertThat(removalResult.removedJobIds, contains("job1", "job2")); + assertThat(removalResult.removedDatafeedIds, contains("df-job1", "df-job2")); + } + + public void testRemoveJobsAndDatafeeds_removeSome() { + Job job1 = JobTests.buildJobBuilder("job1").build(); + Job job2 = JobTests.buildJobBuilder("job2").build(); + DatafeedConfig datafeedConfig1 = createCompatibleDatafeed(job1.getId()); + MlMetadata.Builder mlMetadata = new MlMetadata.Builder() + .putJob(job1, false) + .putJob(job2, false) + .putDatafeed(datafeedConfig1, Collections.emptyMap()); + + MlConfigMigrator.RemovalResult removalResult = MlConfigMigrator.removeJobsAndDatafeeds( + Arrays.asList("job1", "job-none"), Collections.singletonList("df-none"), mlMetadata.build()); + + assertThat(removalResult.mlMetadata.getJobs().keySet(), contains("job2")); + assertThat(removalResult.mlMetadata.getDatafeeds().keySet(), contains("df-job1")); + assertThat(removalResult.removedJobIds, contains("job1")); + assertThat(removalResult.removedDatafeedIds, empty()); + } + + + private DatafeedConfig createCompatibleDatafeed(String jobId) { + // create a datafeed without aggregations or anything + // else that may cause validation errors + DatafeedConfig.Builder datafeedBuilder = new DatafeedConfig.Builder("df-" + jobId, jobId); + datafeedBuilder.setIndices(Collections.singletonList("my_index")); + return datafeedBuilder.build(); + } +} diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/MlInitializationServiceTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/MlInitializationServiceTests.java index 5ded1b205a110..f96fe79908c15 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/MlInitializationServiceTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/MlInitializationServiceTests.java @@ -5,22 +5,13 @@ */ package org.elasticsearch.xpack.ml; -import org.elasticsearch.Version; import org.elasticsearch.client.Client; -import org.elasticsearch.cluster.ClusterChangedEvent; import org.elasticsearch.cluster.ClusterName; -import org.elasticsearch.cluster.ClusterState; -import org.elasticsearch.cluster.metadata.MetaData; -import org.elasticsearch.cluster.node.DiscoveryNode; -import org.elasticsearch.cluster.node.DiscoveryNodes; import org.elasticsearch.cluster.service.ClusterService; -import org.elasticsearch.common.transport.TransportAddress; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.threadpool.ThreadPool; -import org.elasticsearch.xpack.core.ml.MlMetadata; import org.junit.Before; -import java.net.InetAddress; import java.util.concurrent.ExecutorService; import java.util.concurrent.ScheduledFuture; @@ -62,46 +53,21 @@ public void setUpMocks() { public void testInitialize() { MlInitializationService initializationService = new MlInitializationService(threadPool, clusterService, client); - - ClusterState cs = ClusterState.builder(new ClusterName("_name")) - .nodes(DiscoveryNodes.builder() - .add(new DiscoveryNode("_node_id", new TransportAddress(InetAddress.getLoopbackAddress(), 9200), Version.CURRENT)) - .localNodeId("_node_id") - .masterNodeId("_node_id")) - .metaData(MetaData.builder()) - .build(); - initializationService.clusterChanged(new ClusterChangedEvent("_source", cs, cs)); - + initializationService.onMaster(); assertThat(initializationService.getDailyMaintenanceService().isStarted(), is(true)); } public void testInitialize_noMasterNode() { MlInitializationService initializationService = new MlInitializationService(threadPool, clusterService, client); - - ClusterState cs = ClusterState.builder(new ClusterName("_name")) - .nodes(DiscoveryNodes.builder() - .add(new DiscoveryNode("_node_id", new TransportAddress(InetAddress.getLoopbackAddress(), 9200), Version.CURRENT))) - .metaData(MetaData.builder()) - .build(); - initializationService.clusterChanged(new ClusterChangedEvent("_source", cs, cs)); - + initializationService.offMaster(); assertThat(initializationService.getDailyMaintenanceService(), is(nullValue())); } public void testInitialize_alreadyInitialized() { MlInitializationService initializationService = new MlInitializationService(threadPool, clusterService, client); - - ClusterState cs = ClusterState.builder(new ClusterName("_name")) - .nodes(DiscoveryNodes.builder() - .add(new DiscoveryNode("_node_id", new TransportAddress(InetAddress.getLoopbackAddress(), 9200), Version.CURRENT)) - .localNodeId("_node_id") - .masterNodeId("_node_id")) - .metaData(MetaData.builder() - .putCustom(MlMetadata.TYPE, new MlMetadata.Builder().build())) - .build(); MlDailyMaintenanceService initialDailyMaintenanceService = mock(MlDailyMaintenanceService.class); initializationService.setDailyMaintenanceService(initialDailyMaintenanceService); - initializationService.clusterChanged(new ClusterChangedEvent("_source", cs, cs)); + initializationService.onMaster(); assertSame(initialDailyMaintenanceService, initializationService.getDailyMaintenanceService()); } @@ -111,23 +77,10 @@ public void testNodeGoesFromMasterToNonMasterAndBack() { MlDailyMaintenanceService initialDailyMaintenanceService = mock(MlDailyMaintenanceService.class); initializationService.setDailyMaintenanceService(initialDailyMaintenanceService); - ClusterState masterCs = ClusterState.builder(new ClusterName("_name")) - .nodes(DiscoveryNodes.builder() - .add(new DiscoveryNode("_node_id", new TransportAddress(InetAddress.getLoopbackAddress(), 9200), Version.CURRENT)) - .localNodeId("_node_id") - .masterNodeId("_node_id")) - .metaData(MetaData.builder()) - .build(); - ClusterState noMasterCs = ClusterState.builder(new ClusterName("_name")) - .nodes(DiscoveryNodes.builder() - .add(new DiscoveryNode("_node_id", new TransportAddress(InetAddress.getLoopbackAddress(), 9200), Version.CURRENT))) - .metaData(MetaData.builder()) - .build(); - initializationService.clusterChanged(new ClusterChangedEvent("_source", noMasterCs, masterCs)); - + initializationService.offMaster(); verify(initialDailyMaintenanceService).stop(); - initializationService.clusterChanged(new ClusterChangedEvent("_source", masterCs, noMasterCs)); + initializationService.onMaster(); MlDailyMaintenanceService finalDailyMaintenanceService = initializationService.getDailyMaintenanceService(); assertNotSame(initialDailyMaintenanceService, finalDailyMaintenanceService); assertThat(initializationService.getDailyMaintenanceService().isStarted(), is(true)); diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/integration/MlConfigMigratorIT.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/integration/MlConfigMigratorIT.java new file mode 100644 index 0000000000000..51a3b5d2366b0 --- /dev/null +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/integration/MlConfigMigratorIT.java @@ -0,0 +1,143 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.ml.integration; + +import org.elasticsearch.action.index.IndexResponse; +import org.elasticsearch.cluster.ClusterName; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.ClusterStateUpdateTask; +import org.elasticsearch.cluster.metadata.MetaData; +import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.xpack.core.ml.MlMetadata; +import org.elasticsearch.xpack.core.ml.datafeed.DatafeedConfig; +import org.elasticsearch.xpack.core.ml.job.config.Job; +import org.elasticsearch.xpack.ml.MlConfigMigrator; +import org.elasticsearch.xpack.ml.MlSingleNodeTestCase; +import org.elasticsearch.xpack.ml.datafeed.persistence.DatafeedConfigProvider; +import org.elasticsearch.xpack.ml.job.persistence.JobConfigProvider; + +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.Set; +import java.util.concurrent.atomic.AtomicReference; + +import static org.elasticsearch.xpack.core.ml.job.config.JobTests.buildJobBuilder; +import static org.hamcrest.Matchers.empty; +import static org.hamcrest.Matchers.hasSize; +import static org.mockito.Matchers.eq; +import static org.mockito.Mockito.any; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.mock; + +public class MlConfigMigratorIT extends MlSingleNodeTestCase { + + public void testWriteConfigToIndex() throws InterruptedException { + + final String indexJobId = "job-already-migrated"; + // Add a job to the index + JobConfigProvider jobConfigProvider = new JobConfigProvider(client()); + Job indexJob = buildJobBuilder(indexJobId).build(); + // Same as index job but has extra fields in its custom settings + // which will be used to check the config was overwritten + Job migratedJob = MlConfigMigrator.updateJobForMigration(indexJob); + + AtomicReference exceptionHolder = new AtomicReference<>(); + AtomicReference indexResponseHolder = new AtomicReference<>(); + // put a job representing a previously migrated job + blockingCall(actionListener -> jobConfigProvider.putJob(migratedJob, actionListener), indexResponseHolder, exceptionHolder); + + ClusterService clusterService = mock(ClusterService.class); + MlConfigMigrator mlConfigMigrator = new MlConfigMigrator(client(), clusterService); + + AtomicReference> failedIdsHolder = new AtomicReference<>(); + Job foo = buildJobBuilder("foo").build(); + // try to write foo and 'job-already-migrated' which does not have the custom setting field + assertNull(indexJob.getCustomSettings()); + + blockingCall(actionListener -> mlConfigMigrator.writeConfigToIndex(Collections.emptyList(), + Arrays.asList(indexJob, foo), actionListener), + failedIdsHolder, exceptionHolder); + + assertNull(exceptionHolder.get()); + assertThat(failedIdsHolder.get(), empty()); + + // Check job foo has been indexed and job-already-migrated has been overwritten + AtomicReference> jobsHolder = new AtomicReference<>(); + blockingCall(actionListener -> jobConfigProvider.expandJobs("*", true, false, actionListener), + jobsHolder, exceptionHolder); + + assertNull(exceptionHolder.get()); + assertThat(jobsHolder.get(), hasSize(2)); + Job fooJob = jobsHolder.get().get(0).build(); + assertEquals("foo", fooJob.getId()); + // this job won't have been marked as migrated as calling + // MlConfigMigrator.writeConfigToIndex directly does not do that + assertNull(fooJob.getCustomSettings()); + Job alreadyMigratedJob = jobsHolder.get().get(1).build(); + assertEquals("job-already-migrated", alreadyMigratedJob.getId()); + assertNull(alreadyMigratedJob.getCustomSettings()); + } + + public void testMigrateConfigs() throws InterruptedException { + + // and jobs and datafeeds clusterstate + MlMetadata.Builder mlMetadata = new MlMetadata.Builder(); + mlMetadata.putJob(buildJobBuilder("job-foo").build(), false); + mlMetadata.putJob(buildJobBuilder("job-bar").build(), false); + DatafeedConfig.Builder builder = new DatafeedConfig.Builder("df-1", "job-foo"); + builder.setIndices(Collections.singletonList("beats*")); + mlMetadata.putDatafeed(builder.build(), Collections.emptyMap()); + + ClusterState clusterState = ClusterState.builder(new ClusterName("_name")) + .metaData(MetaData.builder() + .putCustom(MlMetadata.TYPE, mlMetadata.build())) + .build(); + + ClusterService clusterService = mock(ClusterService.class); + doAnswer(invocation -> { + ClusterStateUpdateTask listener = (ClusterStateUpdateTask) invocation.getArguments()[1]; + listener.clusterStateProcessed("source", mock(ClusterState.class), mock(ClusterState.class)); + return null; + }).when(clusterService).submitStateUpdateTask(eq("remove-migrated-ml-configs"), any()); + + AtomicReference exceptionHolder = new AtomicReference<>(); + AtomicReference responseHolder = new AtomicReference<>(); + + // do the migration + MlConfigMigrator mlConfigMigrator = new MlConfigMigrator(client(), clusterService); + blockingCall(actionListener -> mlConfigMigrator.migrateConfigsWithoutTasks(clusterState, actionListener), + responseHolder, exceptionHolder); + + assertNull(exceptionHolder.get()); + assertTrue(responseHolder.get()); + + // check the jobs have been migrated + AtomicReference> jobsHolder = new AtomicReference<>(); + JobConfigProvider jobConfigProvider = new JobConfigProvider(client()); + blockingCall(actionListener -> jobConfigProvider.expandJobs("*", true, true, actionListener), + jobsHolder, exceptionHolder); + + assertNull(exceptionHolder.get()); + assertThat(jobsHolder.get(), hasSize(2)); + assertTrue(jobsHolder.get().get(0).build().getCustomSettings().containsKey(MlConfigMigrator.MIGRATED_FROM_VERSION)); + assertEquals("job-bar", jobsHolder.get().get(0).build().getId()); + assertTrue(jobsHolder.get().get(1).build().getCustomSettings().containsKey(MlConfigMigrator.MIGRATED_FROM_VERSION)); + assertEquals("job-foo", jobsHolder.get().get(1).build().getId()); + + // check datafeeds are migrated + DatafeedConfigProvider datafeedConfigProvider = new DatafeedConfigProvider(client(), xContentRegistry()); + AtomicReference> datafeedsHolder = new AtomicReference<>(); + blockingCall(actionListener -> datafeedConfigProvider.expandDatafeedConfigs("*", true, actionListener), + datafeedsHolder, exceptionHolder); + + assertNull(exceptionHolder.get()); + assertThat(datafeedsHolder.get(), hasSize(1)); + assertEquals("df-1", datafeedsHolder.get().get(0).getId()); + } +} + + diff --git a/x-pack/qa/rolling-upgrade/src/test/resources/rest-api-spec/test/upgraded_cluster/60_ml_config_migration.yml b/x-pack/qa/rolling-upgrade/src/test/resources/rest-api-spec/test/upgraded_cluster/60_ml_config_migration.yml index f40c0439fa64c..227005185fc0d 100644 --- a/x-pack/qa/rolling-upgrade/src/test/resources/rest-api-spec/test/upgraded_cluster/60_ml_config_migration.yml +++ b/x-pack/qa/rolling-upgrade/src/test/resources/rest-api-spec/test/upgraded_cluster/60_ml_config_migration.yml @@ -62,34 +62,34 @@ setup: job_id: migration-old-cluster-open-job - is_true: jobs.0.finished_time - - - do: - xpack.ml.delete_datafeed: - datafeed_id: migration-old-cluster-started-datafeed - - - do: - xpack.ml.delete_job: - job_id: migration-old-cluster-open-job - - - do: - catch: missing - xpack.ml.get_jobs: - job_id: migration-old-cluster-open-job - - - do: - xpack.ml.delete_datafeed: - datafeed_id: migration-old-cluster-stopped-datafeed - - - do: - xpack.ml.delete_job: - job_id: migration-old-cluster-closed-job - - - do: - xpack.ml.get_jobs: - job_id: migration* - - match: { count: 0 } - - - do: - xpack.ml.get_datafeeds: - datafeed_id: migration* - - match: { count: 0 } +# TODO cannot test delete as config may be migrating +# - do: +# xpack.ml.delete_datafeed: +# datafeed_id: migration-old-cluster-started-datafeed +# +# - do: +# xpack.ml.delete_job: +# job_id: migration-old-cluster-open-job +# +# - do: +# catch: missing +# xpack.ml.get_jobs: +# job_id: migration-old-cluster-open-job +# +# - do: +# xpack.ml.delete_datafeed: +# datafeed_id: migration-old-cluster-stopped-datafeed +# +# - do: +# xpack.ml.delete_job: +# job_id: migration-old-cluster-closed-job +# +# - do: +# xpack.ml.get_jobs: +# job_id: migration* +# - match: { count: 0 } +# +# - do: +# xpack.ml.get_datafeeds: +# datafeed_id: migration* +# - match: { count: 0 }