Skip to content

[ML] ML legacy index templates that are no longer needed should be deleted #80876

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Nov 19, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
import org.elasticsearch.xpack.core.ml.job.config.JobState;
import org.elasticsearch.xpack.core.ml.job.config.JobTaskState;
import org.elasticsearch.xpack.ml.MachineLearning;
import org.elasticsearch.xpack.ml.MlInitializationService;
import org.elasticsearch.xpack.ml.support.BaseMlIntegTestCase;
import org.junit.After;
import org.junit.Before;
Expand All @@ -62,6 +63,7 @@
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.hasEntry;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.notNullValue;

public class BasicDistributedJobsIT extends BaseMlIntegTestCase {
Expand Down Expand Up @@ -251,6 +253,14 @@ public void testDedicatedMlNode() throws Exception {
}
ensureStableCluster(3);

// By now the ML initialization service should have realised that there are no
// legacy ML templates in the cluster and it doesn't need to keep checking
MlInitializationService mlInitializationService = internalCluster().getInstance(
MlInitializationService.class,
internalCluster().getMasterName()
);
assertBusy(() -> assertThat(mlInitializationService.checkForLegacyMlTemplates(), is(false)));

String jobId = "dedicated-ml-node-job";
Job.Builder job = createJob(jobId, ByteSizeValue.ofMb(2));
PutJobAction.Request putJobRequest = new PutJobAction.Request(job);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.admin.indices.alias.IndicesAliasesAction;
import org.elasticsearch.action.admin.indices.alias.IndicesAliasesRequest;
Expand All @@ -20,14 +21,19 @@
import org.elasticsearch.action.admin.indices.settings.get.GetSettingsResponse;
import org.elasticsearch.action.admin.indices.settings.put.UpdateSettingsAction;
import org.elasticsearch.action.admin.indices.settings.put.UpdateSettingsRequest;
import org.elasticsearch.action.admin.indices.template.delete.DeleteIndexTemplateAction;
import org.elasticsearch.action.admin.indices.template.delete.DeleteIndexTemplateRequest;
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.action.support.master.MasterNodeRequest;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.ClusterChangedEvent;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterStateListener;
import org.elasticsearch.cluster.metadata.AliasMetadata;
import org.elasticsearch.cluster.metadata.IndexTemplateMetadata;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.collect.ImmutableOpenMap;
import org.elasticsearch.common.component.LifecycleListener;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.gateway.GatewayService;
Expand All @@ -50,10 +56,25 @@ public class MlInitializationService implements ClusterStateListener {

private static final Logger logger = LogManager.getLogger(MlInitializationService.class);

public static final List<String> LEGACY_ML_INDEX_TEMPLATES = List.of(
".ml-anomalies-",
".ml-config",
".ml-inference-000001",
".ml-inference-000002",
".ml-inference-000003",
".ml-meta",
".ml-notifications",
".ml-notifications-000001",
".ml-state",
".ml-stats"
);

private final Client client;
private final ThreadPool threadPool;
private final AtomicBoolean isIndexCreationInProgress = new AtomicBoolean(false);
private final AtomicBoolean mlInternalIndicesHidden = new AtomicBoolean(false);
private final AtomicBoolean mlLegacyTemplateDeletionInProgress = new AtomicBoolean(false);
private final AtomicBoolean checkForLegacyMlTemplates = new AtomicBoolean(true);
private volatile String previousException;

private final MlDailyMaintenanceService mlDailyMaintenanceService;
Expand Down Expand Up @@ -154,6 +175,56 @@ public void clusterChanged(ClusterChangedEvent event) {
})
);
}

// The atomic flag shortcircuits the check after no legacy templates have been found to exist.
if (this.isMaster && checkForLegacyMlTemplates.get()) {
if (deleteOneMlLegacyTemplateIfNecessary(client, event.state()) == false) {
checkForLegacyMlTemplates.set(false);
}
}
}

/**
* @return <code>true</code> if further calls to this method are worthwhile.
* <code>false</code> if this method never needs to be called again.
*/
private boolean deleteOneMlLegacyTemplateIfNecessary(Client client, ClusterState state) {

String templateToDelete = nextTemplateToDelete(state.getMetadata().getTemplates());
if (templateToDelete != null) {
// This atomic flag prevents multiple simultaneous attempts to delete a legacy index
// template if there is a flurry of cluster state updates in quick succession.
if (mlLegacyTemplateDeletionInProgress.compareAndSet(false, true) == false) {
return true;
}
executeAsyncWithOrigin(
client,
ML_ORIGIN,
DeleteIndexTemplateAction.INSTANCE,
new DeleteIndexTemplateRequest(templateToDelete),
ActionListener.wrap(r -> {
mlLegacyTemplateDeletionInProgress.set(false);
logger.debug("Deleted legacy ML index template [{}]", templateToDelete);
}, e -> {
mlLegacyTemplateDeletionInProgress.set(false);
logger.debug(new ParameterizedMessage("Error deleting legacy ML index template [{}]", templateToDelete), e);
})
);

return true;
}

// We should never need to check again
return false;
}

private String nextTemplateToDelete(ImmutableOpenMap<String, IndexTemplateMetadata> legacyTemplates) {
for (String mlLegacyTemplate : LEGACY_ML_INDEX_TEMPLATES) {
if (legacyTemplates.containsKey(mlLegacyTemplate)) {
return mlLegacyTemplate;
}
}
return null;
}

/** For testing */
Expand All @@ -166,6 +237,11 @@ public boolean areMlInternalIndicesHidden() {
return mlInternalIndicesHidden.get();
}

/** For testing */
public boolean checkForLegacyMlTemplates() {
return checkForLegacyMlTemplates.get();
}

private void makeMlInternalIndicesHidden() {
String[] mlHiddenIndexPatterns = MachineLearning.getMlHiddenIndexPatterns();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.Base64;
import java.util.List;
import java.util.Map;

import static org.hamcrest.Matchers.equalTo;
Expand All @@ -42,12 +41,10 @@ protected Settings restClientSettings() {

@Before
public void waitForMlTemplates() throws Exception {
List<String> templatesToWaitFor = (isRunningAgainstOldCluster() && getOldClusterVersion().before(Version.V_7_12_0))
? XPackRestTestConstants.ML_POST_V660_TEMPLATES
: XPackRestTestConstants.ML_POST_V7120_TEMPLATES;
boolean clusterUnderstandsComposableTemplates = isRunningAgainstOldCluster() == false
|| getOldClusterVersion().onOrAfter(Version.V_7_8_0);
XPackRestTestHelper.waitForTemplates(client(), templatesToWaitFor, clusterUnderstandsComposableTemplates);
// We shouldn't wait for ML templates during the upgrade - production won't
if (isRunningAgainstOldCluster()) {
XPackRestTestHelper.waitForTemplates(client(), XPackRestTestConstants.ML_POST_V7120_TEMPLATES, true);
}
}

public void testMlConfigIndexMappingsAfterMigration() throws Exception {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,12 +56,10 @@ protected Settings restClientSettings() {

@Before
public void waitForMlTemplates() throws Exception {
List<String> templatesToWaitFor = (isRunningAgainstOldCluster() && getOldClusterVersion().before(Version.V_7_12_0))
? XPackRestTestConstants.ML_POST_V660_TEMPLATES
: XPackRestTestConstants.ML_POST_V7120_TEMPLATES;
boolean clusterUnderstandsComposableTemplates = isRunningAgainstOldCluster() == false
|| getOldClusterVersion().onOrAfter(Version.V_7_8_0);
XPackRestTestHelper.waitForTemplates(client(), templatesToWaitFor, clusterUnderstandsComposableTemplates);
// We shouldn't wait for ML templates during the upgrade - production won't
if (isRunningAgainstOldCluster()) {
XPackRestTestHelper.waitForTemplates(client(), XPackRestTestConstants.ML_POST_V7120_TEMPLATES, true);
}
}

public void testMlIndicesBecomeHidden() throws Exception {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@
*/
package org.elasticsearch.xpack.restart;

import org.elasticsearch.Version;
import org.elasticsearch.client.Request;
import org.elasticsearch.client.Response;
import org.elasticsearch.common.Strings;
Expand Down Expand Up @@ -50,12 +49,10 @@ protected Settings restClientSettings() {

@Before
public void waitForMlTemplates() throws Exception {
List<String> templatesToWaitFor = (isRunningAgainstOldCluster() && getOldClusterVersion().before(Version.V_7_12_0))
? XPackRestTestConstants.ML_POST_V660_TEMPLATES
: XPackRestTestConstants.ML_POST_V7120_TEMPLATES;
boolean clusterUnderstandsComposableTemplates = isRunningAgainstOldCluster() == false
|| getOldClusterVersion().onOrAfter(Version.V_7_8_0);
XPackRestTestHelper.waitForTemplates(client(), templatesToWaitFor, clusterUnderstandsComposableTemplates);
// We shouldn't wait for ML templates during the upgrade - production won't
if (isRunningAgainstOldCluster()) {
XPackRestTestHelper.waitForTemplates(client(), XPackRestTestConstants.ML_POST_V7120_TEMPLATES, true);
}
}

private void createTestIndex() throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,10 +76,12 @@ private static class HLRC extends RestHighLevelClient {

@Override
protected Collection<String> templatesToWaitFor() {
List<String> templatesToWaitFor = UPGRADE_FROM_VERSION.onOrAfter(Version.V_7_12_0)
? XPackRestTestConstants.ML_POST_V7120_TEMPLATES
: XPackRestTestConstants.ML_POST_V660_TEMPLATES;
return Stream.concat(templatesToWaitFor.stream(), super.templatesToWaitFor().stream()).collect(Collectors.toSet());
// We shouldn't wait for ML templates during the upgrade - production won't
if (CLUSTER_TYPE != ClusterType.OLD) {
return super.templatesToWaitFor();
}
return Stream.concat(XPackRestTestConstants.ML_POST_V7120_TEMPLATES.stream(), super.templatesToWaitFor().stream())
.collect(Collectors.toSet());
}

protected static void waitForPendingUpgraderTasks() throws Exception {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import org.elasticsearch.Version;
import org.elasticsearch.client.Request;
import org.elasticsearch.client.Response;
import org.elasticsearch.client.ResponseException;
import org.elasticsearch.client.ml.job.config.AnalysisConfig;
import org.elasticsearch.client.ml.job.config.DataDescription;
import org.elasticsearch.client.ml.job.config.Detector;
Expand All @@ -22,23 +23,26 @@
import java.io.IOException;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import java.util.stream.Stream;

import static org.elasticsearch.common.xcontent.support.XContentMapValues.extractValue;
import static org.hamcrest.Matchers.empty;
import static org.hamcrest.Matchers.is;

public class MlMappingsUpgradeIT extends AbstractUpgradeTestCase {

private static final String JOB_ID = "ml-mappings-upgrade-job";

@Override
protected Collection<String> templatesToWaitFor() {
List<String> templatesToWaitFor = UPGRADE_FROM_VERSION.onOrAfter(Version.V_7_12_0)
? XPackRestTestConstants.ML_POST_V7120_TEMPLATES
: XPackRestTestConstants.ML_POST_V660_TEMPLATES;
return Stream.concat(templatesToWaitFor.stream(), super.templatesToWaitFor().stream()).collect(Collectors.toSet());
// We shouldn't wait for ML templates during the upgrade - production won't
if (CLUSTER_TYPE != ClusterType.OLD) {
return super.templatesToWaitFor();
}
return Stream.concat(XPackRestTestConstants.ML_POST_V7120_TEMPLATES.stream(), super.templatesToWaitFor().stream())
.collect(Collectors.toSet());
}

/**
Expand All @@ -59,6 +63,7 @@ public void testMappingsUpgrade() throws Exception {
assertUpgradedAnnotationsMappings();
closeAndReopenTestJob();
assertUpgradedConfigMappings();
assertMlLegacyTemplatesDeleted();
IndexMappingTemplateAsserter.assertMlMappingsMatchTemplates(client());
break;
default:
Expand Down Expand Up @@ -168,6 +173,29 @@ private void assertUpgradedAnnotationsMappings() throws Exception {
});
}

private void assertMlLegacyTemplatesDeleted() throws Exception {

// All the legacy ML templates we created over the years should be deleted now they're no longer needed
assertBusy(() -> {
Request request = new Request("GET", "/_template/.ml*");
try {
Response response = client().performRequest(request);
Map<String, Object> responseLevel = entityAsMap(response);
assertNotNull(responseLevel);
// If we get here the test has failed, but it's critical that we find out which templates
// existed, hence not using expectThrows() above
assertThat(responseLevel.keySet(), empty());
} catch (ResponseException e) {
// Not found is fine
assertThat(
"Unexpected failure getting ML templates: " + e.getResponse().getStatusLine(),
e.getResponse().getStatusLine().getStatusCode(),
is(404)
);
}
});
}

@SuppressWarnings("unchecked")
private void assertUpgradedConfigMappings() throws Exception {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,10 @@ public class MlTrainedModelsUpgradeIT extends AbstractUpgradeTestCase {

@Override
protected Collection<String> templatesToWaitFor() {
// We shouldn't wait for ML templates during the upgrade - production won't
if (CLUSTER_TYPE != ClusterType.OLD) {
return super.templatesToWaitFor();
}
return Stream.concat(XPackRestTestConstants.ML_POST_V7120_TEMPLATES.stream(), super.templatesToWaitFor().stream())
.collect(Collectors.toSet());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,6 @@ public final class XPackRestTestConstants {
public static final String STATE_INDEX_PREFIX = ".ml-state";
public static final String RESULTS_INDEX_DEFAULT = "shared";

public static final List<String> ML_POST_V660_TEMPLATES = List.of(
ML_META_INDEX_NAME,
STATE_INDEX_PREFIX,
RESULTS_INDEX_PREFIX,
CONFIG_INDEX
);

public static final List<String> ML_POST_V7120_TEMPLATES = List.of(STATE_INDEX_PREFIX, RESULTS_INDEX_PREFIX);

// Transform constants:
Expand Down