Skip to content

Commit 7b56999

Browse files
committed
[ML] Change JobManager to work with Job config in index (elastic#33064)
1 parent 59a1205 commit 7b56999

30 files changed

+1256
-593
lines changed

x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/MlTasks.java

Lines changed: 22 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -12,8 +12,15 @@
1212
import org.elasticsearch.xpack.core.ml.job.config.JobState;
1313
import org.elasticsearch.xpack.core.ml.job.config.JobTaskState;
1414

15+
import java.util.Collection;
16+
import java.util.Set;
17+
import java.util.stream.Collectors;
18+
1519
public final class MlTasks {
1620

21+
public static final String JOB_TASK_PREFIX = "job-";
22+
public static final String DATAFEED_TASK_PREFIX = "datafeed-";
23+
1724
private MlTasks() {
1825
}
1926

@@ -22,15 +29,15 @@ private MlTasks() {
2229
* A datafeed id can be used as a job id, because they are stored separately in cluster state.
2330
*/
2431
public static String jobTaskId(String jobId) {
25-
return "job-" + jobId;
32+
return JOB_TASK_PREFIX + jobId;
2633
}
2734

2835
/**
2936
* Namespaces the task ids for datafeeds.
3037
* A job id can be used as a datafeed id, because they are stored separately in cluster state.
3138
*/
3239
public static String datafeedTaskId(String datafeedId) {
33-
return "datafeed-" + datafeedId;
40+
return DATAFEED_TASK_PREFIX + datafeedId;
3441
}
3542

3643
@Nullable
@@ -67,4 +74,17 @@ public static DatafeedState getDatafeedState(String datafeedId, @Nullable Persis
6774
return DatafeedState.STOPPED;
6875
}
6976
}
77+
78+
/**
79+
* The job Ids of anomaly detector job tasks
80+
* @param tasks Active tasks
81+
* @return The job Ids of anomaly detector job tasks
82+
*/
83+
public static Set<String> openJobIds(PersistentTasksCustomMetaData tasks) {
84+
Collection<PersistentTasksCustomMetaData.PersistentTask<?>> activeTasks = tasks.tasks();
85+
86+
return activeTasks.stream().filter(t -> t.getId().startsWith(JOB_TASK_PREFIX))
87+
.map(t -> t.getId().substring(JOB_TASK_PREFIX.length()))
88+
.collect(Collectors.toSet());
89+
}
7090
}

x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/persistence/ElasticsearchMappings.java

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -248,9 +248,6 @@ public static void addJobConfigFields(XContentBuilder builder) throws IOExceptio
248248
.startObject(AnalysisConfig.MULTIVARIATE_BY_FIELDS.getPreferredName())
249249
.field(TYPE, BOOLEAN)
250250
.endObject()
251-
.startObject(AnalysisConfig.USE_PER_PARTITION_NORMALIZATION.getPreferredName())
252-
.field(TYPE, BOOLEAN)
253-
.endObject()
254251
.endObject()
255252
.endObject()
256253

x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/results/ReservedFieldNames.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -210,7 +210,6 @@ public final class ReservedFieldNames {
210210
AnalysisConfig.OVERLAPPING_BUCKETS.getPreferredName(),
211211
AnalysisConfig.RESULT_FINALIZATION_WINDOW.getPreferredName(),
212212
AnalysisConfig.MULTIVARIATE_BY_FIELDS.getPreferredName(),
213-
AnalysisConfig.USE_PER_PARTITION_NORMALIZATION.getPreferredName(),
214213

215214
AnalysisLimits.MODEL_MEMORY_LIMIT.getPreferredName(),
216215
AnalysisLimits.CATEGORIZATION_EXAMPLES_LIMIT.getPreferredName(),

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -366,7 +366,7 @@ public Collection<Object> createComponents(Client client, ClusterService cluster
366366
Auditor auditor = new Auditor(client, clusterService.getNodeName());
367367
JobResultsProvider jobResultsProvider = new JobResultsProvider(client, settings);
368368
UpdateJobProcessNotifier notifier = new UpdateJobProcessNotifier(settings, client, clusterService, threadPool);
369-
JobManager jobManager = new JobManager(env, settings, jobResultsProvider, clusterService, auditor, client, notifier);
369+
JobManager jobManager = new JobManager(env, settings, jobResultsProvider, clusterService, auditor, threadPool, client, notifier);
370370

371371
JobDataCountsPersister jobDataCountsPersister = new JobDataCountsPersister(settings, client);
372372
JobResultsPersister jobResultsPersister = new JobResultsPersister(settings, client);

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportDeleteCalendarAction.java

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -62,8 +62,11 @@ protected void doExecute(Task task, DeleteCalendarAction.Request request, Action
6262
listener.onFailure(new ResourceNotFoundException("No calendar with id [" + calendarId + "]"));
6363
return;
6464
}
65-
jobManager.updateProcessOnCalendarChanged(calendar.getJobIds());
66-
listener.onResponse(new AcknowledgedResponse(true));
65+
66+
jobManager.updateProcessOnCalendarChanged(calendar.getJobIds(), ActionListener.wrap(
67+
r -> listener.onResponse(new AcknowledgedResponse(true)),
68+
listener::onFailure
69+
));
6770
},
6871
listener::onFailure));
6972
},

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportDeleteCalendarEventAction.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -102,8 +102,10 @@ public void onResponse(DeleteResponse response) {
102102
if (response.status() == RestStatus.NOT_FOUND) {
103103
listener.onFailure(new ResourceNotFoundException("No event with id [" + eventId + "]"));
104104
} else {
105-
jobManager.updateProcessOnCalendarChanged(calendar.getJobIds());
106-
listener.onResponse(new AcknowledgedResponse(true));
105+
jobManager.updateProcessOnCalendarChanged(calendar.getJobIds(), ActionListener.wrap(
106+
r -> listener.onResponse(new AcknowledgedResponse(true)),
107+
listener::onFailure
108+
));
107109
}
108110
}
109111

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportDeleteModelSnapshotAction.java

Lines changed: 35 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -12,17 +12,15 @@
1212
import org.elasticsearch.action.support.HandledTransportAction;
1313
import org.elasticsearch.action.support.master.AcknowledgedResponse;
1414
import org.elasticsearch.client.Client;
15-
import org.elasticsearch.cluster.service.ClusterService;
1615
import org.elasticsearch.common.inject.Inject;
1716
import org.elasticsearch.common.settings.Settings;
1817
import org.elasticsearch.tasks.Task;
1918
import org.elasticsearch.transport.TransportService;
2019
import org.elasticsearch.xpack.core.ml.action.DeleteModelSnapshotAction;
21-
import org.elasticsearch.xpack.core.ml.job.config.Job;
2220
import org.elasticsearch.xpack.core.ml.job.messages.Messages;
23-
import org.elasticsearch.xpack.ml.job.persistence.JobDataDeleter;
2421
import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.ModelSnapshot;
2522
import org.elasticsearch.xpack.ml.job.JobManager;
23+
import org.elasticsearch.xpack.ml.job.persistence.JobDataDeleter;
2624
import org.elasticsearch.xpack.ml.job.persistence.JobResultsProvider;
2725
import org.elasticsearch.xpack.ml.notifications.Auditor;
2826

@@ -33,19 +31,19 @@ public class TransportDeleteModelSnapshotAction extends HandledTransportAction<D
3331
AcknowledgedResponse> {
3432

3533
private final Client client;
34+
private final JobManager jobManager;
3635
private final JobResultsProvider jobResultsProvider;
37-
private final ClusterService clusterService;
3836
private final Auditor auditor;
3937

4038
@Inject
4139
public TransportDeleteModelSnapshotAction(Settings settings, TransportService transportService, ActionFilters actionFilters,
42-
JobResultsProvider jobResultsProvider, ClusterService clusterService, Client client,
40+
JobResultsProvider jobResultsProvider, Client client, JobManager jobManager,
4341
Auditor auditor) {
4442
super(settings, DeleteModelSnapshotAction.NAME, transportService, actionFilters,
4543
DeleteModelSnapshotAction.Request::new);
4644
this.client = client;
45+
this.jobManager = jobManager;
4746
this.jobResultsProvider = jobResultsProvider;
48-
this.clusterService = clusterService;
4947
this.auditor = auditor;
5048
}
5149

@@ -70,32 +68,40 @@ protected void doExecute(Task task, DeleteModelSnapshotAction.Request request,
7068
ModelSnapshot deleteCandidate = deleteCandidates.get(0);
7169

7270
// Verify the snapshot is not being used
73-
Job job = JobManager.getJobOrThrowIfUnknown(request.getJobId(), clusterService.state());
74-
String currentModelInUse = job.getModelSnapshotId();
75-
if (currentModelInUse != null && currentModelInUse.equals(request.getSnapshotId())) {
76-
throw new IllegalArgumentException(Messages.getMessage(Messages.REST_CANNOT_DELETE_HIGHEST_PRIORITY,
77-
request.getSnapshotId(), request.getJobId()));
78-
}
71+
jobManager.getJob(request.getJobId(), ActionListener.wrap(
72+
job -> {
73+
String currentModelInUse = job.getModelSnapshotId();
74+
if (currentModelInUse != null && currentModelInUse.equals(request.getSnapshotId())) {
75+
listener.onFailure(
76+
new IllegalArgumentException(Messages.getMessage(Messages.REST_CANNOT_DELETE_HIGHEST_PRIORITY,
77+
request.getSnapshotId(), request.getJobId())));
78+
return;
79+
}
80+
81+
// Delete the snapshot and any associated state files
82+
JobDataDeleter deleter = new JobDataDeleter(client, request.getJobId());
83+
deleter.deleteModelSnapshots(Collections.singletonList(deleteCandidate),
84+
new ActionListener<BulkResponse>() {
85+
@Override
86+
public void onResponse(BulkResponse bulkResponse) {
87+
String msg = Messages.getMessage(Messages.JOB_AUDIT_SNAPSHOT_DELETED,
88+
deleteCandidate.getSnapshotId(), deleteCandidate.getDescription());
7989

80-
// Delete the snapshot and any associated state files
81-
JobDataDeleter deleter = new JobDataDeleter(client, request.getJobId());
82-
deleter.deleteModelSnapshots(Collections.singletonList(deleteCandidate), new ActionListener<BulkResponse>() {
83-
@Override
84-
public void onResponse(BulkResponse bulkResponse) {
85-
String msg = Messages.getMessage(Messages.JOB_AUDIT_SNAPSHOT_DELETED, deleteCandidate.getSnapshotId(),
86-
deleteCandidate.getDescription());
87-
auditor.info(request.getJobId(), msg);
88-
logger.debug("[{}] {}", request.getJobId(), msg);
89-
// We don't care about the bulk response, just that it succeeded
90-
listener.onResponse(new AcknowledgedResponse(true));
91-
}
90+
auditor.info(request.getJobId(), msg);
91+
logger.debug("[{}] {}", request.getJobId(), msg);
92+
// We don't care about the bulk response, just that it succeeded
93+
listener.onResponse(new AcknowledgedResponse(true));
94+
}
9295

93-
@Override
94-
public void onFailure(Exception e) {
95-
listener.onFailure(e);
96-
}
97-
});
96+
@Override
97+
public void onFailure(Exception e) {
98+
listener.onFailure(e);
99+
}
100+
});
98101

102+
},
103+
listener::onFailure
104+
));
99105
}, listener::onFailure);
100106
}
101107
}

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportForecastJobAction.java

Lines changed: 51 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,6 @@
99
import org.elasticsearch.Version;
1010
import org.elasticsearch.action.ActionListener;
1111
import org.elasticsearch.action.support.ActionFilters;
12-
import org.elasticsearch.cluster.ClusterState;
1312
import org.elasticsearch.cluster.service.ClusterService;
1413
import org.elasticsearch.common.inject.Inject;
1514
import org.elasticsearch.common.io.stream.StreamInput;
@@ -41,14 +40,17 @@ public class TransportForecastJobAction extends TransportJobTaskAction<ForecastJ
4140
private static final ByteSizeValue FORECAST_LOCAL_STORAGE_LIMIT = new ByteSizeValue(500, ByteSizeUnit.MB);
4241

4342
private final JobResultsProvider jobResultsProvider;
43+
private final JobManager jobManager;
4444
@Inject
4545
public TransportForecastJobAction(Settings settings, TransportService transportService,
4646
ClusterService clusterService, ActionFilters actionFilters,
47-
JobResultsProvider jobResultsProvider, AutodetectProcessManager processManager) {
47+
JobResultsProvider jobResultsProvider, AutodetectProcessManager processManager,
48+
JobManager jobManager) {
4849
super(settings, ForecastJobAction.NAME, clusterService, transportService, actionFilters,
4950
ForecastJobAction.Request::new, ForecastJobAction.Response::new,
5051
ThreadPool.Names.SAME, processManager);
5152
this.jobResultsProvider = jobResultsProvider;
53+
this.jobManager = jobManager;
5254
// ThreadPool.Names.SAME, because operations is executed by autodetect worker thread
5355
}
5456

@@ -62,57 +64,63 @@ protected ForecastJobAction.Response readTaskResponse(StreamInput in) throws IOE
6264
@Override
6365
protected void taskOperation(ForecastJobAction.Request request, TransportOpenJobAction.JobTask task,
6466
ActionListener<ForecastJobAction.Response> listener) {
65-
ClusterState state = clusterService.state();
66-
Job job = JobManager.getJobOrThrowIfUnknown(task.getJobId(), state);
67-
validate(job, request);
67+
jobManager.getJob(task.getJobId(), ActionListener.wrap(
68+
job -> {
69+
validate(job, request);
6870

69-
ForecastParams.Builder paramsBuilder = ForecastParams.builder();
71+
ForecastParams.Builder paramsBuilder = ForecastParams.builder();
7072

71-
if (request.getDuration() != null) {
72-
paramsBuilder.duration(request.getDuration());
73-
}
73+
if (request.getDuration() != null) {
74+
paramsBuilder.duration(request.getDuration());
75+
}
7476

75-
if (request.getExpiresIn() != null) {
76-
paramsBuilder.expiresIn(request.getExpiresIn());
77-
}
77+
if (request.getExpiresIn() != null) {
78+
paramsBuilder.expiresIn(request.getExpiresIn());
79+
}
7880

79-
// tmp storage might be null, we do not log here, because it might not be
80-
// required
81-
Path tmpStorage = processManager.tryGetTmpStorage(task, FORECAST_LOCAL_STORAGE_LIMIT);
82-
if (tmpStorage != null) {
83-
paramsBuilder.tmpStorage(tmpStorage.toString());
84-
}
81+
// tmp storage might be null, we do not log here, because it might not be
82+
// required
83+
Path tmpStorage = processManager.tryGetTmpStorage(task, FORECAST_LOCAL_STORAGE_LIMIT);
84+
if (tmpStorage != null) {
85+
paramsBuilder.tmpStorage(tmpStorage.toString());
86+
}
8587

86-
ForecastParams params = paramsBuilder.build();
87-
processManager.forecastJob(task, params, e -> {
88-
if (e == null) {
89-
Consumer<ForecastRequestStats> forecastRequestStatsHandler = forecastRequestStats -> {
90-
if (forecastRequestStats == null) {
91-
// paranoia case, it should not happen that we do not retrieve a result
92-
listener.onFailure(new ElasticsearchException(
93-
"Cannot run forecast: internal error, please check the logs"));
94-
} else if (forecastRequestStats.getStatus() == ForecastRequestStats.ForecastRequestStatus.FAILED) {
95-
List<String> messages = forecastRequestStats.getMessages();
96-
if (messages.size() > 0) {
97-
listener.onFailure(ExceptionsHelper.badRequestException("Cannot run forecast: "
98-
+ messages.get(0)));
88+
ForecastParams params = paramsBuilder.build();
89+
processManager.forecastJob(task, params, e -> {
90+
if (e == null) {
91+
; getForecastRequestStats(request.getJobId(), params.getForecastId(), listener);
9992
} else {
100-
// paranoia case, it should not be possible to have an empty message list
101-
listener.onFailure(
102-
new ElasticsearchException(
103-
"Cannot run forecast: internal error, please check the logs"));
93+
listener.onFailure(e);
10494
}
105-
} else {
106-
listener.onResponse(new ForecastJobAction.Response(true, params.getForecastId()));
107-
}
108-
};
95+
});
96+
},
97+
listener::onFailure
98+
));
99+
}
109100

110-
jobResultsProvider.getForecastRequestStats(request.getJobId(), params.getForecastId(),
111-
forecastRequestStatsHandler, listener::onFailure);
101+
private void getForecastRequestStats(String jobId, String forecastId, ActionListener<ForecastJobAction.Response> listener) {
102+
Consumer<ForecastRequestStats> forecastRequestStatsHandler = forecastRequestStats -> {
103+
if (forecastRequestStats == null) {
104+
// paranoia case, it should not happen that we do not retrieve a result
105+
listener.onFailure(new ElasticsearchException(
106+
"Cannot run forecast: internal error, please check the logs"));
107+
} else if (forecastRequestStats.getStatus() == ForecastRequestStats.ForecastRequestStatus.FAILED) {
108+
List<String> messages = forecastRequestStats.getMessages();
109+
if (messages.size() > 0) {
110+
listener.onFailure(ExceptionsHelper.badRequestException("Cannot run forecast: "
111+
+ messages.get(0)));
112+
} else {
113+
// paranoia case, it should not be possible to have an empty message list
114+
listener.onFailure(
115+
new ElasticsearchException(
116+
"Cannot run forecast: internal error, please check the logs"));
117+
}
112118
} else {
113-
listener.onFailure(e);
119+
listener.onResponse(new ForecastJobAction.Response(true, forecastId));
114120
}
115-
});
121+
};
122+
123+
jobResultsProvider.getForecastRequestStats(jobId, forecastId, forecastRequestStatsHandler, listener::onFailure);
116124
}
117125

118126
static void validate(Job job, ForecastJobAction.Request request) {

0 commit comments

Comments
 (0)