26
26
import org .elasticsearch .xpack .core .XPackSettings ;
27
27
import org .elasticsearch .xpack .core .XPackField ;
28
28
import org .elasticsearch .xpack .core .ml .MachineLearningFeatureSetUsage ;
29
- import org .elasticsearch .xpack .core .ml .MlMetadata ;
30
29
import org .elasticsearch .xpack .core .ml .action .GetDatafeedsStatsAction ;
31
30
import org .elasticsearch .xpack .core .ml .action .GetJobsStatsAction ;
32
31
import org .elasticsearch .xpack .core .ml .datafeed .DatafeedState ;
33
32
import org .elasticsearch .xpack .core .ml .job .config .Job ;
34
33
import org .elasticsearch .xpack .core .ml .job .config .JobState ;
34
+ import org .elasticsearch .xpack .ml .job .JobManagerHolder ;
35
35
import org .elasticsearch .xpack .ml .process .NativeController ;
36
36
import org .elasticsearch .xpack .ml .process .NativeControllerHolder ;
37
37
import org .elasticsearch .xpack .core .ml .job .process .autodetect .state .ModelSizeStats ;
48
48
import java .util .Map ;
49
49
import java .util .Objects ;
50
50
import java .util .concurrent .TimeoutException ;
51
+ import java .util .stream .Collectors ;
51
52
52
53
public class MachineLearningFeatureSet implements XPackFeatureSet {
53
54
@@ -61,15 +62,17 @@ public class MachineLearningFeatureSet implements XPackFeatureSet {
61
62
private final XPackLicenseState licenseState ;
62
63
private final ClusterService clusterService ;
63
64
private final Client client ;
65
+ private final JobManagerHolder jobManagerHolder ;
64
66
private final Map <String , Object > nativeCodeInfo ;
65
67
66
68
@ Inject
67
69
public MachineLearningFeatureSet (Environment environment , ClusterService clusterService , Client client ,
68
- @ Nullable XPackLicenseState licenseState ) {
70
+ @ Nullable XPackLicenseState licenseState , JobManagerHolder jobManagerHolder ) {
69
71
this .enabled = XPackSettings .MACHINE_LEARNING_ENABLED .get (environment .settings ());
70
72
this .clusterService = Objects .requireNonNull (clusterService );
71
73
this .client = Objects .requireNonNull (client );
72
74
this .licenseState = licenseState ;
75
+ this .jobManagerHolder = jobManagerHolder ;
73
76
Map <String , Object > nativeCodeInfo = NativeController .UNKNOWN_NATIVE_CODE_INFO ;
74
77
// Don't try to get the native code version if ML is disabled - it causes too much controversy
75
78
// if ML has been disabled because of some OS incompatibility. Also don't try to get the native
@@ -135,7 +138,7 @@ public Map<String, Object> nativeCodeInfo() {
135
138
@ Override
136
139
public void usage (ActionListener <XPackFeatureSet .Usage > listener ) {
137
140
ClusterState state = clusterService .state ();
138
- new Retriever (client , MlMetadata . getMlMetadata ( state ) , available (), enabled (), mlNodeCount (state )).execute (listener );
141
+ new Retriever (client , jobManagerHolder , available (), enabled (), mlNodeCount (state )).execute (listener );
139
142
}
140
143
141
144
private int mlNodeCount (final ClusterState clusterState ) {
@@ -156,16 +159,16 @@ private int mlNodeCount(final ClusterState clusterState) {
156
159
public static class Retriever {
157
160
158
161
private final Client client ;
159
- private final MlMetadata mlMetadata ;
162
+ private final JobManagerHolder jobManagerHolder ;
160
163
private final boolean available ;
161
164
private final boolean enabled ;
162
165
private Map <String , Object > jobsUsage ;
163
166
private Map <String , Object > datafeedsUsage ;
164
167
private int nodeCount ;
165
168
166
- public Retriever (Client client , MlMetadata mlMetadata , boolean available , boolean enabled , int nodeCount ) {
169
+ public Retriever (Client client , JobManagerHolder jobManagerHolder , boolean available , boolean enabled , int nodeCount ) {
167
170
this .client = Objects .requireNonNull (client );
168
- this .mlMetadata = mlMetadata ;
171
+ this .jobManagerHolder = jobManagerHolder ;
169
172
this .available = available ;
170
173
this .enabled = enabled ;
171
174
this .jobsUsage = new LinkedHashMap <>();
@@ -174,7 +177,8 @@ public Retriever(Client client, MlMetadata mlMetadata, boolean available, boolea
174
177
}
175
178
176
179
public void execute (ActionListener <Usage > listener ) {
177
- if (enabled == false ) {
180
+ // empty holder means either ML disabled or transport client mode
181
+ if (jobManagerHolder .isEmpty ()) {
178
182
listener .onResponse (
179
183
new MachineLearningFeatureSetUsage (available , enabled , Collections .emptyMap (), Collections .emptyMap (), 0 ));
180
184
return ;
@@ -194,20 +198,19 @@ public void execute(ActionListener<Usage> listener) {
194
198
GetJobsStatsAction .Request jobStatsRequest = new GetJobsStatsAction .Request (MetaData .ALL );
195
199
ActionListener <GetJobsStatsAction .Response > jobStatsListener = ActionListener .wrap (
196
200
response -> {
197
- addJobsUsage (response );
198
- GetDatafeedsStatsAction .Request datafeedStatsRequest =
199
- new GetDatafeedsStatsAction .Request (GetDatafeedsStatsAction .ALL );
200
- client .execute (GetDatafeedsStatsAction .INSTANCE , datafeedStatsRequest ,
201
- datafeedStatsListener );
202
- },
203
- listener ::onFailure
204
- );
201
+ jobManagerHolder .getJobManager ().expandJobs (MetaData .ALL , true , ActionListener .wrap (jobs -> {
202
+ addJobsUsage (response , jobs .results ());
203
+ GetDatafeedsStatsAction .Request datafeedStatsRequest = new GetDatafeedsStatsAction .Request (
204
+ GetDatafeedsStatsAction .ALL );
205
+ client .execute (GetDatafeedsStatsAction .INSTANCE , datafeedStatsRequest , datafeedStatsListener );
206
+ }, listener ::onFailure ));
207
+ }, listener ::onFailure );
205
208
206
209
// Step 0. Kick off the chain of callbacks by requesting jobs stats
207
210
client .execute (GetJobsStatsAction .INSTANCE , jobStatsRequest , jobStatsListener );
208
211
}
209
212
210
- private void addJobsUsage (GetJobsStatsAction .Response response ) {
213
+ private void addJobsUsage (GetJobsStatsAction .Response response , List < Job > jobs ) {
211
214
StatsAccumulator allJobsDetectorsStats = new StatsAccumulator ();
212
215
StatsAccumulator allJobsModelSizeStats = new StatsAccumulator ();
213
216
ForecastStats allJobsForecastStats = new ForecastStats ();
@@ -217,11 +220,11 @@ private void addJobsUsage(GetJobsStatsAction.Response response) {
217
220
Map <JobState , StatsAccumulator > modelSizeStatsByState = new HashMap <>();
218
221
Map <JobState , ForecastStats > forecastStatsByState = new HashMap <>();
219
222
220
- Map <String , Job > jobs = mlMetadata .getJobs ();
221
223
List <GetJobsStatsAction .Response .JobStats > jobsStats = response .getResponse ().results ();
224
+ Map <String , Job > jobMap = jobs .stream ().collect (Collectors .toMap (Job ::getId , item -> item ));
222
225
for (GetJobsStatsAction .Response .JobStats jobStats : jobsStats ) {
223
226
ModelSizeStats modelSizeStats = jobStats .getModelSizeStats ();
224
- int detectorsCount = jobs .get (jobStats .getJobId ()).getAnalysisConfig ()
227
+ int detectorsCount = jobMap .get (jobStats .getJobId ()).getAnalysisConfig ()
225
228
.getDetectors ().size ();
226
229
double modelSize = modelSizeStats == null ? 0.0
227
230
: jobStats .getModelSizeStats ().getModelBytes ();
0 commit comments