Skip to content

Commit 0412504

Browse files
authored
[ML] Adding data frame analytics stats to _usage API (elastic#45820)
* [ML] Adding data frame analytics stats to _usage API * making the size of analytics stats 10k
1 parent e8c8230 commit 0412504

File tree

3 files changed

+91
-15
lines changed

3 files changed

+91
-15
lines changed

x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/MachineLearningFeatureSetUsage.java

Lines changed: 22 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -5,13 +5,15 @@
55
*/
66
package org.elasticsearch.xpack.core.ml;
77

8+
import org.elasticsearch.Version;
89
import org.elasticsearch.common.io.stream.StreamInput;
910
import org.elasticsearch.common.io.stream.StreamOutput;
1011
import org.elasticsearch.common.xcontent.XContentBuilder;
1112
import org.elasticsearch.xpack.core.XPackFeatureSet;
1213
import org.elasticsearch.xpack.core.XPackField;
1314

1415
import java.io.IOException;
16+
import java.util.Collections;
1517
import java.util.Map;
1618
import java.util.Objects;
1719

@@ -26,23 +28,35 @@ public class MachineLearningFeatureSetUsage extends XPackFeatureSet.Usage {
2628
public static final String MODEL_SIZE = "model_size";
2729
public static final String CREATED_BY = "created_by";
2830
public static final String NODE_COUNT = "node_count";
31+
public static final String DATA_FRAME_ANALYTICS_JOBS_FIELD = "data_frame_analytics_jobs";
2932

3033
private final Map<String, Object> jobsUsage;
3134
private final Map<String, Object> datafeedsUsage;
35+
private final Map<String, Object> analyticsUsage;
3236
private final int nodeCount;
3337

34-
public MachineLearningFeatureSetUsage(boolean available, boolean enabled, Map<String, Object> jobsUsage,
35-
Map<String, Object> datafeedsUsage, int nodeCount) {
38+
public MachineLearningFeatureSetUsage(boolean available,
39+
boolean enabled,
40+
Map<String, Object> jobsUsage,
41+
Map<String, Object> datafeedsUsage,
42+
Map<String, Object> analyticsUsage,
43+
int nodeCount) {
3644
super(XPackField.MACHINE_LEARNING, available, enabled);
3745
this.jobsUsage = Objects.requireNonNull(jobsUsage);
3846
this.datafeedsUsage = Objects.requireNonNull(datafeedsUsage);
47+
this.analyticsUsage = Objects.requireNonNull(analyticsUsage);
3948
this.nodeCount = nodeCount;
4049
}
4150

4251
public MachineLearningFeatureSetUsage(StreamInput in) throws IOException {
4352
super(in);
4453
this.jobsUsage = in.readMap();
4554
this.datafeedsUsage = in.readMap();
55+
if (in.getVersion().onOrAfter(Version.V_8_0_0)) {
56+
this.analyticsUsage = in.readMap();
57+
} else {
58+
this.analyticsUsage = Collections.emptyMap();
59+
}
4660
this.nodeCount = in.readInt();
4761
}
4862

@@ -51,18 +65,18 @@ public void writeTo(StreamOutput out) throws IOException {
5165
super.writeTo(out);
5266
out.writeMap(jobsUsage);
5367
out.writeMap(datafeedsUsage);
68+
if (out.getVersion().onOrAfter(Version.V_8_0_0)) {
69+
out.writeMap(analyticsUsage);
70+
}
5471
out.writeInt(nodeCount);
5572
}
5673

5774
@Override
5875
protected void innerXContent(XContentBuilder builder, Params params) throws IOException {
5976
super.innerXContent(builder, params);
60-
if (jobsUsage != null) {
61-
builder.field(JOBS_FIELD, jobsUsage);
62-
}
63-
if (datafeedsUsage != null) {
64-
builder.field(DATAFEEDS_FIELD, datafeedsUsage);
65-
}
77+
builder.field(JOBS_FIELD, jobsUsage);
78+
builder.field(DATAFEEDS_FIELD, datafeedsUsage);
79+
builder.field(DATA_FRAME_ANALYTICS_JOBS_FIELD, analyticsUsage);
6680
if (nodeCount >= 0) {
6781
builder.field(NODE_COUNT, nodeCount);
6882
}

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearningUsageTransportAction.java

Lines changed: 37 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -25,10 +25,13 @@
2525
import org.elasticsearch.xpack.core.action.XPackUsageFeatureAction;
2626
import org.elasticsearch.xpack.core.action.XPackUsageFeatureResponse;
2727
import org.elasticsearch.xpack.core.action.XPackUsageFeatureTransportAction;
28+
import org.elasticsearch.xpack.core.action.util.PageParams;
2829
import org.elasticsearch.xpack.core.ml.MachineLearningFeatureSetUsage;
30+
import org.elasticsearch.xpack.core.ml.action.GetDataFrameAnalyticsStatsAction;
2931
import org.elasticsearch.xpack.core.ml.action.GetDatafeedsStatsAction;
3032
import org.elasticsearch.xpack.core.ml.action.GetJobsStatsAction;
3133
import org.elasticsearch.xpack.core.ml.datafeed.DatafeedState;
34+
import org.elasticsearch.xpack.core.ml.dataframe.DataFrameAnalyticsState;
3235
import org.elasticsearch.xpack.core.ml.job.config.Job;
3336
import org.elasticsearch.xpack.core.ml.job.config.JobState;
3437
import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.ModelSizeStats;
@@ -69,22 +72,35 @@ protected void masterOperation(Task task, XPackUsageRequest request, ClusterStat
6972
ActionListener<XPackUsageFeatureResponse> listener) {
7073
if (enabled == false) {
7174
MachineLearningFeatureSetUsage usage = new MachineLearningFeatureSetUsage(licenseState.isMachineLearningAllowed(), enabled,
72-
Collections.emptyMap(), Collections.emptyMap(), 0);
75+
Collections.emptyMap(), Collections.emptyMap(), Collections.emptyMap(), 0);
7376
listener.onResponse(new XPackUsageFeatureResponse(usage));
7477
return;
7578
}
7679

7780
Map<String, Object> jobsUsage = new LinkedHashMap<>();
7881
Map<String, Object> datafeedsUsage = new LinkedHashMap<>();
82+
Map<String, Object> analyticsUsage = new LinkedHashMap<>();
7983
int nodeCount = mlNodeCount(state);
8084

85+
// Step 3. Extract usage from data frame analytics stats and return usage response
86+
ActionListener<GetDataFrameAnalyticsStatsAction.Response> dataframeAnalyticsListener = ActionListener.wrap(
87+
response -> {
88+
addDataFrameAnalyticsUsage(response, analyticsUsage);
89+
MachineLearningFeatureSetUsage usage = new MachineLearningFeatureSetUsage(licenseState.isMachineLearningAllowed(),
90+
enabled, jobsUsage, datafeedsUsage, analyticsUsage, nodeCount);
91+
listener.onResponse(new XPackUsageFeatureResponse(usage));
92+
},
93+
listener::onFailure
94+
);
95+
8196
// Step 2. Extract usage from datafeeds stats and return usage response
8297
ActionListener<GetDatafeedsStatsAction.Response> datafeedStatsListener =
8398
ActionListener.wrap(response -> {
8499
addDatafeedsUsage(response, datafeedsUsage);
85-
MachineLearningFeatureSetUsage usage = new MachineLearningFeatureSetUsage(licenseState.isMachineLearningAllowed(),
86-
enabled, jobsUsage, datafeedsUsage, nodeCount);
87-
listener.onResponse(new XPackUsageFeatureResponse(usage));
100+
GetDataFrameAnalyticsStatsAction.Request dataframeAnalyticsStatsRequest =
101+
new GetDataFrameAnalyticsStatsAction.Request(GetDatafeedsStatsAction.ALL);
102+
dataframeAnalyticsStatsRequest.setPageParams(new PageParams(0, 10_000));
103+
client.execute(GetDataFrameAnalyticsStatsAction.INSTANCE, dataframeAnalyticsStatsRequest, dataframeAnalyticsListener);
88104
},
89105
listener::onFailure);
90106

@@ -184,19 +200,33 @@ private void addDatafeedsUsage(GetDatafeedsStatsAction.Response response, Map<St
184200
ds -> Counter.newCounter()).addAndGet(1);
185201
}
186202

187-
datafeedsUsage.put(MachineLearningFeatureSetUsage.ALL, createDatafeedUsageEntry(response.getResponse().count()));
203+
datafeedsUsage.put(MachineLearningFeatureSetUsage.ALL, createCountUsageEntry(response.getResponse().count()));
188204
for (DatafeedState datafeedState : datafeedCountByState.keySet()) {
189205
datafeedsUsage.put(datafeedState.name().toLowerCase(Locale.ROOT),
190-
createDatafeedUsageEntry(datafeedCountByState.get(datafeedState).get()));
206+
createCountUsageEntry(datafeedCountByState.get(datafeedState).get()));
191207
}
192208
}
193209

194-
private Map<String, Object> createDatafeedUsageEntry(long count) {
210+
private Map<String, Object> createCountUsageEntry(long count) {
195211
Map<String, Object> usage = new HashMap<>();
196212
usage.put(MachineLearningFeatureSetUsage.COUNT, count);
197213
return usage;
198214
}
199215

216+
private void addDataFrameAnalyticsUsage(GetDataFrameAnalyticsStatsAction.Response response,
217+
Map<String, Object> dataframeAnalyticsUsage) {
218+
Map<DataFrameAnalyticsState, Counter> dataFrameAnalyticsStateCounterMap = new HashMap<>();
219+
220+
for(GetDataFrameAnalyticsStatsAction.Response.Stats stats : response.getResponse().results()) {
221+
dataFrameAnalyticsStateCounterMap.computeIfAbsent(stats.getState(), ds -> Counter.newCounter()).addAndGet(1);
222+
}
223+
dataframeAnalyticsUsage.put(MachineLearningFeatureSetUsage.ALL, createCountUsageEntry(response.getResponse().count()));
224+
for (DataFrameAnalyticsState state : dataFrameAnalyticsStateCounterMap.keySet()) {
225+
dataframeAnalyticsUsage.put(state.name().toLowerCase(Locale.ROOT),
226+
createCountUsageEntry(dataFrameAnalyticsStateCounterMap.get(state).get()));
227+
}
228+
}
229+
200230
private static int mlNodeCount(final ClusterState clusterState) {
201231
int mlNodeCount = 0;
202232
for (DiscoveryNode node : clusterState.getNodes()) {

x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/MachineLearningInfoTransportActionTests.java

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,10 +34,13 @@
3434
import org.elasticsearch.xpack.core.action.util.QueryPage;
3535
import org.elasticsearch.xpack.core.ml.MachineLearningFeatureSetUsage;
3636
import org.elasticsearch.xpack.core.ml.MachineLearningField;
37+
import org.elasticsearch.xpack.core.ml.action.GetDataFrameAnalyticsAction;
38+
import org.elasticsearch.xpack.core.ml.action.GetDataFrameAnalyticsStatsAction;
3739
import org.elasticsearch.xpack.core.ml.action.GetDatafeedsStatsAction;
3840
import org.elasticsearch.xpack.core.ml.action.GetJobsStatsAction;
3941
import org.elasticsearch.xpack.core.ml.datafeed.DatafeedConfig;
4042
import org.elasticsearch.xpack.core.ml.datafeed.DatafeedState;
43+
import org.elasticsearch.xpack.core.ml.dataframe.DataFrameAnalyticsState;
4144
import org.elasticsearch.xpack.core.ml.job.config.AnalysisConfig;
4245
import org.elasticsearch.xpack.core.ml.job.config.DataDescription;
4346
import org.elasticsearch.xpack.core.ml.job.config.Detector;
@@ -95,6 +98,7 @@ public void init() {
9598
when(clusterService.state()).thenReturn(clusterState);
9699
givenJobs(Collections.emptyList(), Collections.emptyList());
97100
givenDatafeeds(Collections.emptyList());
101+
givenDataFrameAnalytics(Collections.emptyList());
98102
}
99103

100104
private MachineLearningUsageTransportAction newUsageAction(Settings settings) {
@@ -165,6 +169,11 @@ public void testUsage() throws Exception {
165169
buildDatafeedStats(DatafeedState.STARTED),
166170
buildDatafeedStats(DatafeedState.STOPPED)
167171
));
172+
givenDataFrameAnalytics(Arrays.asList(
173+
buildDataFrameAnalyticsStats(DataFrameAnalyticsState.STOPPED),
174+
buildDataFrameAnalyticsStats(DataFrameAnalyticsState.STOPPED),
175+
buildDataFrameAnalyticsStats(DataFrameAnalyticsState.STARTED)
176+
));
168177

169178
var usageAction = newUsageAction(settings.build());
170179
PlainActionFuture<XPackUsageFeatureResponse> future = new PlainActionFuture<>();
@@ -230,6 +239,10 @@ public void testUsage() throws Exception {
230239
assertThat(source.getValue("datafeeds.started.count"), equalTo(2));
231240
assertThat(source.getValue("datafeeds.stopped.count"), equalTo(1));
232241

242+
assertThat(source.getValue("data_frame_analytics_jobs._all.count"), equalTo(3));
243+
assertThat(source.getValue("data_frame_analytics_jobs.started.count"), equalTo(1));
244+
assertThat(source.getValue("data_frame_analytics_jobs.stopped.count"), equalTo(2));
245+
233246
assertThat(source.getValue("jobs._all.forecasts.total"), equalTo(11));
234247
assertThat(source.getValue("jobs._all.forecasts.forecasted_jobs"), equalTo(2));
235248

@@ -391,6 +404,19 @@ private void givenDatafeeds(List<GetDatafeedsStatsAction.Response.DatafeedStats>
391404
}).when(client).execute(same(GetDatafeedsStatsAction.INSTANCE), any(), any());
392405
}
393406

407+
private void givenDataFrameAnalytics(List<GetDataFrameAnalyticsStatsAction.Response.Stats> dataFrameAnalyticsStats) {
408+
doAnswer(invocationOnMock -> {
409+
@SuppressWarnings("unchecked")
410+
ActionListener<GetDataFrameAnalyticsStatsAction.Response> listener =
411+
(ActionListener<GetDataFrameAnalyticsStatsAction.Response>) invocationOnMock.getArguments()[2];
412+
listener.onResponse(new GetDataFrameAnalyticsStatsAction.Response(
413+
new QueryPage<>(dataFrameAnalyticsStats,
414+
dataFrameAnalyticsStats.size(),
415+
GetDataFrameAnalyticsAction.Response.RESULTS_FIELD)));
416+
return Void.TYPE;
417+
}).when(client).execute(same(GetDataFrameAnalyticsStatsAction.INSTANCE), any(), any());
418+
}
419+
394420
private static Detector buildMinDetector(String fieldName) {
395421
Detector.Builder detectorBuilder = new Detector.Builder();
396422
detectorBuilder.setFunction("min");
@@ -431,6 +457,12 @@ private static GetDatafeedsStatsAction.Response.DatafeedStats buildDatafeedStats
431457
return stats;
432458
}
433459

460+
private static GetDataFrameAnalyticsStatsAction.Response.Stats buildDataFrameAnalyticsStats(DataFrameAnalyticsState state) {
461+
GetDataFrameAnalyticsStatsAction.Response.Stats stats = mock(GetDataFrameAnalyticsStatsAction.Response.Stats.class);
462+
when(stats.getState()).thenReturn(state);
463+
return stats;
464+
}
465+
434466
private static ForecastStats buildForecastStats(long numberOfForecasts) {
435467
return new ForecastStatsTests().createForecastStats(numberOfForecasts, numberOfForecasts);
436468
}

0 commit comments

Comments
 (0)