Skip to content

Commit 632c7fb

Browse files
author
Hendrik Muhs
authored
[ML] fix x-pack usage regression caused by index migration (#36936)
Changes the feature usage retrieval to use the job manager rather than directly talking to the cluster state, because jobs can now be either in cluster state or stored in an index This is a follow-up of #36702 / #36698
1 parent d3f1fe4 commit 632c7fb

File tree

5 files changed

+136
-34
lines changed

5 files changed

+136
-34
lines changed

x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/MlJobIT.java

+18
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
1414
import org.elasticsearch.common.util.concurrent.ConcurrentMapLong;
1515
import org.elasticsearch.common.util.concurrent.ThreadContext;
16+
import org.elasticsearch.common.xcontent.support.XContentMapValues;
1617
import org.elasticsearch.test.SecuritySettingsSourceField;
1718
import org.elasticsearch.test.rest.ESRestTestCase;
1819
import org.elasticsearch.xpack.core.ml.integration.MlRestTestStateCleaner;
@@ -22,7 +23,9 @@
2223
import org.junit.After;
2324

2425
import java.io.IOException;
26+
import java.util.Collections;
2527
import java.util.Locale;
28+
import java.util.Map;
2629
import java.util.concurrent.atomic.AtomicInteger;
2730
import java.util.concurrent.atomic.AtomicReference;
2831
import java.util.regex.Matcher;
@@ -111,6 +114,21 @@ public void testGetJobs_GivenMultipleJobs() throws Exception {
111114
assertThat(implicitAll, containsString("\"job_id\":\"given-multiple-jobs-job-3\""));
112115
}
113116

117+
// tests the _xpack/usage endpoint
118+
public void testUsage() throws IOException {
119+
createFarequoteJob("job-1");
120+
createFarequoteJob("job-2");
121+
Map<String, Object> usage = entityAsMap(client().performRequest(new Request("GET", "_xpack/usage")));
122+
assertEquals(2, XContentMapValues.extractValue("ml.jobs._all.count", usage));
123+
assertEquals(2, XContentMapValues.extractValue("ml.jobs.closed.count", usage));
124+
Response openResponse = client().performRequest(new Request("POST", MachineLearning.BASE_PATH + "anomaly_detectors/job-1/_open"));
125+
assertEquals(Collections.singletonMap("opened", true), entityAsMap(openResponse));
126+
usage = entityAsMap(client().performRequest(new Request("GET", "_xpack/usage")));
127+
assertEquals(2, XContentMapValues.extractValue("ml.jobs._all.count", usage));
128+
assertEquals(1, XContentMapValues.extractValue("ml.jobs.closed.count", usage));
129+
assertEquals(1, XContentMapValues.extractValue("ml.jobs.opened.count", usage));
130+
}
131+
114132
private Response createFarequoteJob(String jobId) throws IOException {
115133
Request request = new Request("PUT", MachineLearning.BASE_PATH + "anomaly_detectors/" + jobId);
116134
request.setJsonEntity(

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java

+7-1
Original file line numberDiff line numberDiff line change
@@ -165,6 +165,7 @@
165165
import org.elasticsearch.xpack.ml.datafeed.DatafeedManager;
166166
import org.elasticsearch.xpack.ml.datafeed.persistence.DatafeedConfigProvider;
167167
import org.elasticsearch.xpack.ml.job.JobManager;
168+
import org.elasticsearch.xpack.ml.job.JobManagerHolder;
168169
import org.elasticsearch.xpack.ml.job.UpdateJobProcessNotifier;
169170
import org.elasticsearch.xpack.ml.job.categorization.MlClassicTokenizer;
170171
import org.elasticsearch.xpack.ml.job.categorization.MlClassicTokenizerFactory;
@@ -375,7 +376,8 @@ public Collection<Object> createComponents(Client client, ClusterService cluster
375376
NamedXContentRegistry xContentRegistry, Environment environment,
376377
NodeEnvironment nodeEnvironment, NamedWriteableRegistry namedWriteableRegistry) {
377378
if (enabled == false || transportClientMode) {
378-
return emptyList();
379+
// special holder for @link(MachineLearningFeatureSetUsage) which needs access to job manager, empty if ML is disabled
380+
return Collections.singletonList(new JobManagerHolder());
379381
}
380382

381383
Auditor auditor = new Auditor(client, clusterService.getNodeName());
@@ -385,6 +387,9 @@ public Collection<Object> createComponents(Client client, ClusterService cluster
385387
UpdateJobProcessNotifier notifier = new UpdateJobProcessNotifier(client, clusterService, threadPool);
386388
JobManager jobManager = new JobManager(env, settings, jobResultsProvider, clusterService, auditor, threadPool, client, notifier);
387389

390+
// special holder for @link(MachineLearningFeatureSetUsage) which needs access to job manager if ML is enabled
391+
JobManagerHolder jobManagerHolder = new JobManagerHolder(jobManager);
392+
388393
JobDataCountsPersister jobDataCountsPersister = new JobDataCountsPersister(client);
389394
JobResultsPersister jobResultsPersister = new JobResultsPersister(client);
390395

@@ -443,6 +448,7 @@ public Collection<Object> createComponents(Client client, ClusterService cluster
443448
jobConfigProvider,
444449
datafeedConfigProvider,
445450
jobManager,
451+
jobManagerHolder,
446452
autodetectProcessManager,
447453
new MlInitializationService(settings, threadPool, clusterService, client),
448454
jobDataCountsPersister,

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearningFeatureSet.java

+21-18
Original file line numberDiff line numberDiff line change
@@ -25,12 +25,12 @@
2525
import org.elasticsearch.xpack.core.XPackSettings;
2626
import org.elasticsearch.xpack.core.XPackField;
2727
import org.elasticsearch.xpack.core.ml.MachineLearningFeatureSetUsage;
28-
import org.elasticsearch.xpack.core.ml.MlMetadata;
2928
import org.elasticsearch.xpack.core.ml.action.GetDatafeedsStatsAction;
3029
import org.elasticsearch.xpack.core.ml.action.GetJobsStatsAction;
3130
import org.elasticsearch.xpack.core.ml.datafeed.DatafeedState;
3231
import org.elasticsearch.xpack.core.ml.job.config.Job;
3332
import org.elasticsearch.xpack.core.ml.job.config.JobState;
33+
import org.elasticsearch.xpack.ml.job.JobManagerHolder;
3434
import org.elasticsearch.xpack.ml.process.NativeController;
3535
import org.elasticsearch.xpack.ml.process.NativeControllerHolder;
3636
import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.ModelSizeStats;
@@ -47,6 +47,7 @@
4747
import java.util.Map;
4848
import java.util.Objects;
4949
import java.util.concurrent.TimeoutException;
50+
import java.util.stream.Collectors;
5051

5152
public class MachineLearningFeatureSet implements XPackFeatureSet {
5253

@@ -60,15 +61,17 @@ public class MachineLearningFeatureSet implements XPackFeatureSet {
6061
private final XPackLicenseState licenseState;
6162
private final ClusterService clusterService;
6263
private final Client client;
64+
private final JobManagerHolder jobManagerHolder;
6365
private final Map<String, Object> nativeCodeInfo;
6466

6567
@Inject
6668
public MachineLearningFeatureSet(Environment environment, ClusterService clusterService, Client client,
67-
@Nullable XPackLicenseState licenseState) {
69+
@Nullable XPackLicenseState licenseState, JobManagerHolder jobManagerHolder) {
6870
this.enabled = XPackSettings.MACHINE_LEARNING_ENABLED.get(environment.settings());
6971
this.clusterService = Objects.requireNonNull(clusterService);
7072
this.client = Objects.requireNonNull(client);
7173
this.licenseState = licenseState;
74+
this.jobManagerHolder = jobManagerHolder;
7275
Map<String, Object> nativeCodeInfo = NativeController.UNKNOWN_NATIVE_CODE_INFO;
7376
// Don't try to get the native code version if ML is disabled - it causes too much controversy
7477
// if ML has been disabled because of some OS incompatibility. Also don't try to get the native
@@ -133,7 +136,7 @@ public Map<String, Object> nativeCodeInfo() {
133136
@Override
134137
public void usage(ActionListener<XPackFeatureSet.Usage> listener) {
135138
ClusterState state = clusterService.state();
136-
new Retriever(client, MlMetadata.getMlMetadata(state), available(), enabled(), mlNodeCount(state)).execute(listener);
139+
new Retriever(client, jobManagerHolder, available(), enabled(), mlNodeCount(state)).execute(listener);
137140
}
138141

139142
private int mlNodeCount(final ClusterState clusterState) {
@@ -153,16 +156,16 @@ private int mlNodeCount(final ClusterState clusterState) {
153156
public static class Retriever {
154157

155158
private final Client client;
156-
private final MlMetadata mlMetadata;
159+
private final JobManagerHolder jobManagerHolder;
157160
private final boolean available;
158161
private final boolean enabled;
159162
private Map<String, Object> jobsUsage;
160163
private Map<String, Object> datafeedsUsage;
161164
private int nodeCount;
162165

163-
public Retriever(Client client, MlMetadata mlMetadata, boolean available, boolean enabled, int nodeCount) {
166+
public Retriever(Client client, JobManagerHolder jobManagerHolder, boolean available, boolean enabled, int nodeCount) {
164167
this.client = Objects.requireNonNull(client);
165-
this.mlMetadata = mlMetadata;
168+
this.jobManagerHolder = jobManagerHolder;
166169
this.available = available;
167170
this.enabled = enabled;
168171
this.jobsUsage = new LinkedHashMap<>();
@@ -171,7 +174,8 @@ public Retriever(Client client, MlMetadata mlMetadata, boolean available, boolea
171174
}
172175

173176
public void execute(ActionListener<Usage> listener) {
174-
if (enabled == false) {
177+
// empty holder means either ML disabled or transport client mode
178+
if (jobManagerHolder.isEmpty()) {
175179
listener.onResponse(
176180
new MachineLearningFeatureSetUsage(available, enabled, Collections.emptyMap(), Collections.emptyMap(), 0));
177181
return;
@@ -191,20 +195,19 @@ public void execute(ActionListener<Usage> listener) {
191195
GetJobsStatsAction.Request jobStatsRequest = new GetJobsStatsAction.Request(MetaData.ALL);
192196
ActionListener<GetJobsStatsAction.Response> jobStatsListener = ActionListener.wrap(
193197
response -> {
194-
addJobsUsage(response);
195-
GetDatafeedsStatsAction.Request datafeedStatsRequest =
196-
new GetDatafeedsStatsAction.Request(GetDatafeedsStatsAction.ALL);
197-
client.execute(GetDatafeedsStatsAction.INSTANCE, datafeedStatsRequest,
198-
datafeedStatsListener);
199-
},
200-
listener::onFailure
201-
);
198+
jobManagerHolder.getJobManager().expandJobs(MetaData.ALL, true, ActionListener.wrap(jobs -> {
199+
addJobsUsage(response, jobs.results());
200+
GetDatafeedsStatsAction.Request datafeedStatsRequest = new GetDatafeedsStatsAction.Request(
201+
GetDatafeedsStatsAction.ALL);
202+
client.execute(GetDatafeedsStatsAction.INSTANCE, datafeedStatsRequest, datafeedStatsListener);
203+
}, listener::onFailure));
204+
}, listener::onFailure);
202205

203206
// Step 0. Kick off the chain of callbacks by requesting jobs stats
204207
client.execute(GetJobsStatsAction.INSTANCE, jobStatsRequest, jobStatsListener);
205208
}
206209

207-
private void addJobsUsage(GetJobsStatsAction.Response response) {
210+
private void addJobsUsage(GetJobsStatsAction.Response response, List<Job> jobs) {
208211
StatsAccumulator allJobsDetectorsStats = new StatsAccumulator();
209212
StatsAccumulator allJobsModelSizeStats = new StatsAccumulator();
210213
ForecastStats allJobsForecastStats = new ForecastStats();
@@ -214,11 +217,11 @@ private void addJobsUsage(GetJobsStatsAction.Response response) {
214217
Map<JobState, StatsAccumulator> modelSizeStatsByState = new HashMap<>();
215218
Map<JobState, ForecastStats> forecastStatsByState = new HashMap<>();
216219

217-
Map<String, Job> jobs = mlMetadata.getJobs();
218220
List<GetJobsStatsAction.Response.JobStats> jobsStats = response.getResponse().results();
221+
Map<String, Job> jobMap = jobs.stream().collect(Collectors.toMap(Job::getId, item -> item));
219222
for (GetJobsStatsAction.Response.JobStats jobStats : jobsStats) {
220223
ModelSizeStats modelSizeStats = jobStats.getModelSizeStats();
221-
int detectorsCount = jobs.get(jobStats.getJobId()).getAnalysisConfig()
224+
int detectorsCount = jobMap.get(jobStats.getJobId()).getAnalysisConfig()
222225
.getDetectors().size();
223226
double modelSize = modelSizeStats == null ? 0.0
224227
: jobStats.getModelSizeStats().getModelBytes();
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,46 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License;
4+
* you may not use this file except in compliance with the Elastic License.
5+
*/
6+
7+
package org.elasticsearch.xpack.ml.job;
8+
9+
import org.elasticsearch.ElasticsearchException;
10+
11+
public class JobManagerHolder {
12+
13+
private final JobManager instance;
14+
15+
/**
16+
* Create an empty holder which also means that no job manager gets created.
17+
*/
18+
public JobManagerHolder() {
19+
this.instance = null;
20+
}
21+
22+
/**
23+
* Create a holder that allows lazy creation of a job manager.
24+
*
25+
*/
26+
public JobManagerHolder(JobManager jobManager) {
27+
this.instance = jobManager;
28+
}
29+
30+
public boolean isEmpty() {
31+
return instance == null;
32+
}
33+
34+
/**
35+
* Get the instance of the held JobManager.
36+
*
37+
* @return job manager instance
38+
* @throws ElasticsearchException if holder has been created with the empty constructor
39+
*/
40+
public JobManager getJobManager() {
41+
if (instance == null) {
42+
throw new ElasticsearchException("Tried to get job manager although Machine Learning is disabled");
43+
}
44+
return instance;
45+
}
46+
}

0 commit comments

Comments
 (0)