Skip to content

Commit 50950ce

Browse files
author
Hendrik Muhs
committed
[ML] fix x-pack usage regression caused by index migration (#36936)
Changes the feature usage retrieval to use the job manager rather than directly talking to the cluster state, because jobs can now be either in cluster state or stored in an index This is a follow-up of #36702 / #36698
1 parent 5d000ed commit 50950ce

File tree

5 files changed

+136
-34
lines changed

5 files changed

+136
-34
lines changed

x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/MlJobIT.java

+18
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
1414
import org.elasticsearch.common.util.concurrent.ConcurrentMapLong;
1515
import org.elasticsearch.common.util.concurrent.ThreadContext;
16+
import org.elasticsearch.common.xcontent.support.XContentMapValues;
1617
import org.elasticsearch.test.SecuritySettingsSourceField;
1718
import org.elasticsearch.test.rest.ESRestTestCase;
1819
import org.elasticsearch.xpack.core.ml.integration.MlRestTestStateCleaner;
@@ -22,7 +23,9 @@
2223
import org.junit.After;
2324

2425
import java.io.IOException;
26+
import java.util.Collections;
2527
import java.util.Locale;
28+
import java.util.Map;
2629
import java.util.concurrent.atomic.AtomicInteger;
2730
import java.util.concurrent.atomic.AtomicReference;
2831
import java.util.regex.Matcher;
@@ -111,6 +114,21 @@ public void testGetJobs_GivenMultipleJobs() throws Exception {
111114
assertThat(implicitAll, containsString("\"job_id\":\"given-multiple-jobs-job-3\""));
112115
}
113116

117+
// tests the _xpack/usage endpoint
118+
public void testUsage() throws IOException {
119+
createFarequoteJob("job-1");
120+
createFarequoteJob("job-2");
121+
Map<String, Object> usage = entityAsMap(client().performRequest(new Request("GET", "_xpack/usage")));
122+
assertEquals(2, XContentMapValues.extractValue("ml.jobs._all.count", usage));
123+
assertEquals(2, XContentMapValues.extractValue("ml.jobs.closed.count", usage));
124+
Response openResponse = client().performRequest(new Request("POST", MachineLearning.BASE_PATH + "anomaly_detectors/job-1/_open"));
125+
assertEquals(Collections.singletonMap("opened", true), entityAsMap(openResponse));
126+
usage = entityAsMap(client().performRequest(new Request("GET", "_xpack/usage")));
127+
assertEquals(2, XContentMapValues.extractValue("ml.jobs._all.count", usage));
128+
assertEquals(1, XContentMapValues.extractValue("ml.jobs.closed.count", usage));
129+
assertEquals(1, XContentMapValues.extractValue("ml.jobs.opened.count", usage));
130+
}
131+
114132
private Response createFarequoteJob(String jobId) throws IOException {
115133
Request request = new Request("PUT", MachineLearning.BASE_PATH + "anomaly_detectors/" + jobId);
116134
request.setJsonEntity(

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java

+7-1
Original file line numberDiff line numberDiff line change
@@ -165,6 +165,7 @@
165165
import org.elasticsearch.xpack.ml.datafeed.DatafeedManager;
166166
import org.elasticsearch.xpack.ml.datafeed.persistence.DatafeedConfigProvider;
167167
import org.elasticsearch.xpack.ml.job.JobManager;
168+
import org.elasticsearch.xpack.ml.job.JobManagerHolder;
168169
import org.elasticsearch.xpack.ml.job.UpdateJobProcessNotifier;
169170
import org.elasticsearch.xpack.ml.job.categorization.MlClassicTokenizer;
170171
import org.elasticsearch.xpack.ml.job.categorization.MlClassicTokenizerFactory;
@@ -372,7 +373,8 @@ public Collection<Object> createComponents(Client client, ClusterService cluster
372373
NamedXContentRegistry xContentRegistry, Environment environment,
373374
NodeEnvironment nodeEnvironment, NamedWriteableRegistry namedWriteableRegistry) {
374375
if (enabled == false || transportClientMode || tribeNode || tribeNodeClient) {
375-
return emptyList();
376+
// special holder for @link(MachineLearningFeatureSetUsage) which needs access to job manager, empty if ML is disabled
377+
return Collections.singletonList(new JobManagerHolder());
376378
}
377379

378380
Auditor auditor = new Auditor(client, clusterService.getNodeName());
@@ -382,6 +384,9 @@ public Collection<Object> createComponents(Client client, ClusterService cluster
382384
UpdateJobProcessNotifier notifier = new UpdateJobProcessNotifier(client, clusterService, threadPool);
383385
JobManager jobManager = new JobManager(env, settings, jobResultsProvider, clusterService, auditor, threadPool, client, notifier);
384386

387+
// special holder for @link(MachineLearningFeatureSetUsage) which needs access to job manager if ML is enabled
388+
JobManagerHolder jobManagerHolder = new JobManagerHolder(jobManager);
389+
385390
JobDataCountsPersister jobDataCountsPersister = new JobDataCountsPersister(client);
386391
JobResultsPersister jobResultsPersister = new JobResultsPersister(client);
387392

@@ -440,6 +445,7 @@ public Collection<Object> createComponents(Client client, ClusterService cluster
440445
jobConfigProvider,
441446
datafeedConfigProvider,
442447
jobManager,
448+
jobManagerHolder,
443449
autodetectProcessManager,
444450
new MlInitializationService(settings, threadPool, clusterService, client),
445451
jobDataCountsPersister,

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearningFeatureSet.java

+21-18
Original file line numberDiff line numberDiff line change
@@ -26,12 +26,12 @@
2626
import org.elasticsearch.xpack.core.XPackSettings;
2727
import org.elasticsearch.xpack.core.XPackField;
2828
import org.elasticsearch.xpack.core.ml.MachineLearningFeatureSetUsage;
29-
import org.elasticsearch.xpack.core.ml.MlMetadata;
3029
import org.elasticsearch.xpack.core.ml.action.GetDatafeedsStatsAction;
3130
import org.elasticsearch.xpack.core.ml.action.GetJobsStatsAction;
3231
import org.elasticsearch.xpack.core.ml.datafeed.DatafeedState;
3332
import org.elasticsearch.xpack.core.ml.job.config.Job;
3433
import org.elasticsearch.xpack.core.ml.job.config.JobState;
34+
import org.elasticsearch.xpack.ml.job.JobManagerHolder;
3535
import org.elasticsearch.xpack.ml.process.NativeController;
3636
import org.elasticsearch.xpack.ml.process.NativeControllerHolder;
3737
import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.ModelSizeStats;
@@ -48,6 +48,7 @@
4848
import java.util.Map;
4949
import java.util.Objects;
5050
import java.util.concurrent.TimeoutException;
51+
import java.util.stream.Collectors;
5152

5253
public class MachineLearningFeatureSet implements XPackFeatureSet {
5354

@@ -61,15 +62,17 @@ public class MachineLearningFeatureSet implements XPackFeatureSet {
6162
private final XPackLicenseState licenseState;
6263
private final ClusterService clusterService;
6364
private final Client client;
65+
private final JobManagerHolder jobManagerHolder;
6466
private final Map<String, Object> nativeCodeInfo;
6567

6668
@Inject
6769
public MachineLearningFeatureSet(Environment environment, ClusterService clusterService, Client client,
68-
@Nullable XPackLicenseState licenseState) {
70+
@Nullable XPackLicenseState licenseState, JobManagerHolder jobManagerHolder) {
6971
this.enabled = XPackSettings.MACHINE_LEARNING_ENABLED.get(environment.settings());
7072
this.clusterService = Objects.requireNonNull(clusterService);
7173
this.client = Objects.requireNonNull(client);
7274
this.licenseState = licenseState;
75+
this.jobManagerHolder = jobManagerHolder;
7376
Map<String, Object> nativeCodeInfo = NativeController.UNKNOWN_NATIVE_CODE_INFO;
7477
// Don't try to get the native code version if ML is disabled - it causes too much controversy
7578
// if ML has been disabled because of some OS incompatibility. Also don't try to get the native
@@ -135,7 +138,7 @@ public Map<String, Object> nativeCodeInfo() {
135138
@Override
136139
public void usage(ActionListener<XPackFeatureSet.Usage> listener) {
137140
ClusterState state = clusterService.state();
138-
new Retriever(client, MlMetadata.getMlMetadata(state), available(), enabled(), mlNodeCount(state)).execute(listener);
141+
new Retriever(client, jobManagerHolder, available(), enabled(), mlNodeCount(state)).execute(listener);
139142
}
140143

141144
private int mlNodeCount(final ClusterState clusterState) {
@@ -156,16 +159,16 @@ private int mlNodeCount(final ClusterState clusterState) {
156159
public static class Retriever {
157160

158161
private final Client client;
159-
private final MlMetadata mlMetadata;
162+
private final JobManagerHolder jobManagerHolder;
160163
private final boolean available;
161164
private final boolean enabled;
162165
private Map<String, Object> jobsUsage;
163166
private Map<String, Object> datafeedsUsage;
164167
private int nodeCount;
165168

166-
public Retriever(Client client, MlMetadata mlMetadata, boolean available, boolean enabled, int nodeCount) {
169+
public Retriever(Client client, JobManagerHolder jobManagerHolder, boolean available, boolean enabled, int nodeCount) {
167170
this.client = Objects.requireNonNull(client);
168-
this.mlMetadata = mlMetadata;
171+
this.jobManagerHolder = jobManagerHolder;
169172
this.available = available;
170173
this.enabled = enabled;
171174
this.jobsUsage = new LinkedHashMap<>();
@@ -174,7 +177,8 @@ public Retriever(Client client, MlMetadata mlMetadata, boolean available, boolea
174177
}
175178

176179
public void execute(ActionListener<Usage> listener) {
177-
if (enabled == false) {
180+
// empty holder means either ML disabled or transport client mode
181+
if (jobManagerHolder.isEmpty()) {
178182
listener.onResponse(
179183
new MachineLearningFeatureSetUsage(available, enabled, Collections.emptyMap(), Collections.emptyMap(), 0));
180184
return;
@@ -194,20 +198,19 @@ public void execute(ActionListener<Usage> listener) {
194198
GetJobsStatsAction.Request jobStatsRequest = new GetJobsStatsAction.Request(MetaData.ALL);
195199
ActionListener<GetJobsStatsAction.Response> jobStatsListener = ActionListener.wrap(
196200
response -> {
197-
addJobsUsage(response);
198-
GetDatafeedsStatsAction.Request datafeedStatsRequest =
199-
new GetDatafeedsStatsAction.Request(GetDatafeedsStatsAction.ALL);
200-
client.execute(GetDatafeedsStatsAction.INSTANCE, datafeedStatsRequest,
201-
datafeedStatsListener);
202-
},
203-
listener::onFailure
204-
);
201+
jobManagerHolder.getJobManager().expandJobs(MetaData.ALL, true, ActionListener.wrap(jobs -> {
202+
addJobsUsage(response, jobs.results());
203+
GetDatafeedsStatsAction.Request datafeedStatsRequest = new GetDatafeedsStatsAction.Request(
204+
GetDatafeedsStatsAction.ALL);
205+
client.execute(GetDatafeedsStatsAction.INSTANCE, datafeedStatsRequest, datafeedStatsListener);
206+
}, listener::onFailure));
207+
}, listener::onFailure);
205208

206209
// Step 0. Kick off the chain of callbacks by requesting jobs stats
207210
client.execute(GetJobsStatsAction.INSTANCE, jobStatsRequest, jobStatsListener);
208211
}
209212

210-
private void addJobsUsage(GetJobsStatsAction.Response response) {
213+
private void addJobsUsage(GetJobsStatsAction.Response response, List<Job> jobs) {
211214
StatsAccumulator allJobsDetectorsStats = new StatsAccumulator();
212215
StatsAccumulator allJobsModelSizeStats = new StatsAccumulator();
213216
ForecastStats allJobsForecastStats = new ForecastStats();
@@ -217,11 +220,11 @@ private void addJobsUsage(GetJobsStatsAction.Response response) {
217220
Map<JobState, StatsAccumulator> modelSizeStatsByState = new HashMap<>();
218221
Map<JobState, ForecastStats> forecastStatsByState = new HashMap<>();
219222

220-
Map<String, Job> jobs = mlMetadata.getJobs();
221223
List<GetJobsStatsAction.Response.JobStats> jobsStats = response.getResponse().results();
224+
Map<String, Job> jobMap = jobs.stream().collect(Collectors.toMap(Job::getId, item -> item));
222225
for (GetJobsStatsAction.Response.JobStats jobStats : jobsStats) {
223226
ModelSizeStats modelSizeStats = jobStats.getModelSizeStats();
224-
int detectorsCount = jobs.get(jobStats.getJobId()).getAnalysisConfig()
227+
int detectorsCount = jobMap.get(jobStats.getJobId()).getAnalysisConfig()
225228
.getDetectors().size();
226229
double modelSize = modelSizeStats == null ? 0.0
227230
: jobStats.getModelSizeStats().getModelBytes();
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,46 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License;
4+
* you may not use this file except in compliance with the Elastic License.
5+
*/
6+
7+
package org.elasticsearch.xpack.ml.job;
8+
9+
import org.elasticsearch.ElasticsearchException;
10+
11+
public class JobManagerHolder {
12+
13+
private final JobManager instance;
14+
15+
/**
16+
* Create an empty holder which also means that no job manager gets created.
17+
*/
18+
public JobManagerHolder() {
19+
this.instance = null;
20+
}
21+
22+
/**
23+
* Create a holder that allows lazy creation of a job manager.
24+
*
25+
*/
26+
public JobManagerHolder(JobManager jobManager) {
27+
this.instance = jobManager;
28+
}
29+
30+
public boolean isEmpty() {
31+
return instance == null;
32+
}
33+
34+
/**
35+
* Get the instance of the held JobManager.
36+
*
37+
* @return job manager instance
38+
* @throws ElasticsearchException if holder has been created with the empty constructor
39+
*/
40+
public JobManager getJobManager() {
41+
if (instance == null) {
42+
throw new ElasticsearchException("Tried to get job manager although Machine Learning is disabled");
43+
}
44+
return instance;
45+
}
46+
}

0 commit comments

Comments
 (0)