Skip to content

Commit db12caa

Browse files
[ML] Fix calendar and filter updates from non-master nodes
Job updates or changes to calendars or filters may result into updating the job process if it has been running. To preserve the order of updates, process updates are queued through the UpdateJobProcessNotifier which is only running on the master node. All actions performing such updates must run on the master node. However, the CRUD actions for calendars and filters are not master node actions. They have been submitting the updates to the UpdateJobProcessNotifier even though it might have not been running (given the action was run on a non-master node). When that happens, the update never reaches the process. This commit fixes this problem by ensuring the notifier runs on all nodes and by ensuring the process update action gets the resources again before updating the process (instead of having those resources passed in the request). This ensures that even if the order of the updates gets messed up, the latest update will read the latest state of those resource and the process will get back in sync. This leaves us with 2 types of updates: 1. updates to the job config should happen on the master node. This is because we cannot refetch the entire job and update it. We need to know the parts that have been changed. 2. updates to resources the job uses. Those can be handled on non-master nodes but they should be re-fetched by the update process action. Closes elastic#31803
1 parent 896317f commit db12caa

File tree

7 files changed

+211
-61
lines changed

7 files changed

+211
-61
lines changed

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/UpdateJobProcessNotifier.java

+36-19
Original file line numberDiff line numberDiff line change
@@ -5,14 +5,15 @@
55
*/
66
package org.elasticsearch.xpack.ml.job;
77

8+
import org.apache.logging.log4j.Logger;
89
import org.elasticsearch.ElasticsearchStatusException;
910
import org.elasticsearch.ResourceNotFoundException;
1011
import org.elasticsearch.action.ActionListener;
1112
import org.elasticsearch.client.Client;
12-
import org.elasticsearch.cluster.LocalNodeMasterListener;
1313
import org.elasticsearch.cluster.service.ClusterService;
1414
import org.elasticsearch.common.component.AbstractComponent;
1515
import org.elasticsearch.common.component.LifecycleListener;
16+
import org.elasticsearch.common.logging.Loggers;
1617
import org.elasticsearch.common.settings.Settings;
1718
import org.elasticsearch.common.unit.TimeValue;
1819
import org.elasticsearch.threadpool.ThreadPool;
@@ -31,9 +32,26 @@
3132
import static org.elasticsearch.xpack.core.ml.action.UpdateProcessAction.Request;
3233
import static org.elasticsearch.xpack.core.ml.action.UpdateProcessAction.Response;
3334

34-
public class UpdateJobProcessNotifier extends AbstractComponent implements LocalNodeMasterListener {
35+
/**
36+
* This class serves as a queue for updates to the job process.
37+
* Queueing is important for 2 reasons: first, it throttles the updates
38+
* to the process, and second and most important, it preserves the order of the updates
39+
* for actions that run on the master node. For preserving the order of the updates
40+
* to the job config, it's necessary to handle the whole update chain on the master
41+
* node. However, for updates to resources the job uses (e.g. calendars, filters),
42+
* they can be handled on non-master nodes as long as the update process action
43+
* is fetching the latest version of those resources from the index instead of
44+
* using the version that existed while the handling action was at work. This makes
45+
* sure that even if the order of updates gets reversed, the final process update
46+
* will fetch the valid state of those external resources ensuring the process is
47+
* in sync.
48+
*/
49+
public class UpdateJobProcessNotifier extends AbstractComponent {
50+
51+
private static final Logger LOGGER = Loggers.getLogger(UpdateJobProcessNotifier.class);
3552

3653
private final Client client;
54+
private final ClusterService clusterService;
3755
private final ThreadPool threadPool;
3856
private final LinkedBlockingQueue<UpdateHolder> orderedJobUpdates = new LinkedBlockingQueue<>(1000);
3957

@@ -42,9 +60,15 @@ public class UpdateJobProcessNotifier extends AbstractComponent implements Local
4260
public UpdateJobProcessNotifier(Settings settings, Client client, ClusterService clusterService, ThreadPool threadPool) {
4361
super(settings);
4462
this.client = client;
63+
this.clusterService = clusterService;
4564
this.threadPool = threadPool;
46-
clusterService.addLocalNodeMasterListener(this);
4765
clusterService.addLifecycleListener(new LifecycleListener() {
66+
67+
@Override
68+
public void beforeStart() {
69+
start();
70+
}
71+
4872
@Override
4973
public void beforeStop() {
5074
stop();
@@ -56,16 +80,6 @@ boolean submitJobUpdate(UpdateParams update, ActionListener<Boolean> listener) {
5680
return orderedJobUpdates.offer(new UpdateHolder(update, listener));
5781
}
5882

59-
@Override
60-
public void onMaster() {
61-
start();
62-
}
63-
64-
@Override
65-
public void offMaster() {
66-
stop();
67-
}
68-
6983
private void start() {
7084
cancellable = threadPool.scheduleWithFixedDelay(this::processNextUpdate, TimeValue.timeValueSeconds(1), ThreadPool.Names.GENERIC);
7185
}
@@ -79,12 +93,6 @@ private void stop() {
7993
}
8094
}
8195

82-
@Override
83-
public String executorName() {
84-
// SAME is ok here, because both start() and stop() are inexpensive:
85-
return ThreadPool.Names.SAME;
86-
}
87-
8896
private void processNextUpdate() {
8997
List<UpdateHolder> updates = new ArrayList<>(orderedJobUpdates.size());
9098
try {
@@ -101,6 +109,15 @@ void executeProcessUpdates(Iterator<UpdateHolder> updatesIterator) {
101109
}
102110
UpdateHolder updateHolder = updatesIterator.next();
103111
UpdateParams update = updateHolder.update;
112+
113+
if (update.isJobUpdate() && clusterService.localNode().isMasterNode() == false) {
114+
assert clusterService.localNode().isMasterNode();
115+
LOGGER.error("Job update was submitted to non-master node [" + clusterService.nodeName() + "]; update for job ["
116+
+ update.getJobId() + "] will be ignored");
117+
executeProcessUpdates(updatesIterator);
118+
return;
119+
}
120+
104121
Request request = new Request(update.getJobId(), update.getModelPlotConfig(), update.getDetectorUpdates(), update.getFilter(),
105122
update.isUpdateScheduledEvents());
106123

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectCommunicator.java

+18-21
Original file line numberDiff line numberDiff line change
@@ -16,36 +16,34 @@
1616
import org.elasticsearch.common.xcontent.XContentType;
1717
import org.elasticsearch.env.Environment;
1818
import org.elasticsearch.index.analysis.AnalysisRegistry;
19-
import org.elasticsearch.xpack.ml.MachineLearning;
20-
import org.elasticsearch.xpack.core.ml.calendars.ScheduledEvent;
21-
import org.elasticsearch.xpack.ml.job.categorization.CategorizationAnalyzer;
2219
import org.elasticsearch.xpack.core.ml.job.config.AnalysisConfig;
2320
import org.elasticsearch.xpack.core.ml.job.config.CategorizationAnalyzerConfig;
2421
import org.elasticsearch.xpack.core.ml.job.config.DataDescription;
2522
import org.elasticsearch.xpack.core.ml.job.config.Job;
2623
import org.elasticsearch.xpack.core.ml.job.config.JobUpdate;
24+
import org.elasticsearch.xpack.core.ml.job.process.autodetect.output.FlushAcknowledgement;
25+
import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.DataCounts;
26+
import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.ModelSizeStats;
27+
import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.ModelSnapshot;
28+
import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper;
29+
import org.elasticsearch.xpack.ml.MachineLearning;
30+
import org.elasticsearch.xpack.ml.job.categorization.CategorizationAnalyzer;
2731
import org.elasticsearch.xpack.ml.job.persistence.StateStreamer;
2832
import org.elasticsearch.xpack.ml.job.process.CountingInputStream;
2933
import org.elasticsearch.xpack.ml.job.process.DataCountsReporter;
3034
import org.elasticsearch.xpack.ml.job.process.autodetect.output.AutoDetectResultProcessor;
31-
import org.elasticsearch.xpack.core.ml.job.process.autodetect.output.FlushAcknowledgement;
3235
import org.elasticsearch.xpack.ml.job.process.autodetect.params.DataLoadParams;
3336
import org.elasticsearch.xpack.ml.job.process.autodetect.params.FlushJobParams;
3437
import org.elasticsearch.xpack.ml.job.process.autodetect.params.ForecastParams;
35-
import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.DataCounts;
36-
import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.ModelSizeStats;
37-
import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.ModelSnapshot;
3838
import org.elasticsearch.xpack.ml.job.process.autodetect.writer.DataToProcessWriter;
3939
import org.elasticsearch.xpack.ml.job.process.autodetect.writer.DataToProcessWriterFactory;
40-
import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper;
4140

4241
import java.io.Closeable;
4342
import java.io.IOException;
4443
import java.io.InputStream;
4544
import java.time.Duration;
4645
import java.time.ZonedDateTime;
4746
import java.util.Collections;
48-
import java.util.List;
4947
import java.util.Locale;
5048
import java.util.Optional;
5149
import java.util.concurrent.CountDownLatch;
@@ -205,30 +203,29 @@ public void killProcess(boolean awaitCompletion, boolean finish) throws IOExcept
205203
}
206204
}
207205

208-
public void writeUpdateProcessMessage(UpdateParams updateParams, List<ScheduledEvent> scheduledEvents,
209-
BiConsumer<Void, Exception> handler) {
206+
public void writeUpdateProcessMessage(UpdateProcessMessage update, BiConsumer<Void, Exception> handler) {
210207
submitOperation(() -> {
211-
if (updateParams.getModelPlotConfig() != null) {
212-
autodetectProcess.writeUpdateModelPlotMessage(updateParams.getModelPlotConfig());
208+
if (update.getModelPlotConfig() != null) {
209+
autodetectProcess.writeUpdateModelPlotMessage(update.getModelPlotConfig());
213210
}
214211

215212
// Filters have to be written before detectors
216-
if (updateParams.getFilter() != null) {
217-
autodetectProcess.writeUpdateFiltersMessage(Collections.singletonList(updateParams.getFilter()));
213+
if (update.getFilter() != null) {
214+
autodetectProcess.writeUpdateFiltersMessage(Collections.singletonList(update.getFilter()));
218215
}
219216

220217
// Add detector rules
221-
if (updateParams.getDetectorUpdates() != null) {
222-
for (JobUpdate.DetectorUpdate update : updateParams.getDetectorUpdates()) {
223-
if (update.getRules() != null) {
224-
autodetectProcess.writeUpdateDetectorRulesMessage(update.getDetectorIndex(), update.getRules());
218+
if (update.getDetectorUpdates() != null) {
219+
for (JobUpdate.DetectorUpdate detectorUpdate : update.getDetectorUpdates()) {
220+
if (detectorUpdate.getRules() != null) {
221+
autodetectProcess.writeUpdateDetectorRulesMessage(detectorUpdate.getDetectorIndex(), detectorUpdate.getRules());
225222
}
226223
}
227224
}
228225

229226
// Add scheduled events; null means there's no update but an empty list means we should clear any events in the process
230-
if (scheduledEvents != null) {
231-
autodetectProcess.writeUpdateScheduledEventsMessage(scheduledEvents, job.getAnalysisConfig().getBucketSpan());
227+
if (update.getScheduledEvents() != null) {
228+
autodetectProcess.writeUpdateScheduledEventsMessage(update.getScheduledEvents(), job.getAnalysisConfig().getBucketSpan());
232229
}
233230

234231
return null;

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManager.java

+54-18
Original file line numberDiff line numberDiff line change
@@ -5,8 +5,6 @@
55
*/
66
package org.elasticsearch.xpack.ml.job.process.autodetect;
77

8-
import org.elasticsearch.common.xcontent.XContentElasticsearchExtension;
9-
import org.elasticsearch.core.internal.io.IOUtils;
108
import org.apache.logging.log4j.message.ParameterizedMessage;
119
import org.elasticsearch.ElasticsearchStatusException;
1210
import org.elasticsearch.action.ActionListener;
@@ -22,35 +20,39 @@
2220
import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
2321
import org.elasticsearch.common.util.concurrent.ThreadContext;
2422
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
23+
import org.elasticsearch.common.xcontent.XContentElasticsearchExtension;
2524
import org.elasticsearch.common.xcontent.XContentType;
25+
import org.elasticsearch.core.internal.io.IOUtils;
2626
import org.elasticsearch.env.Environment;
2727
import org.elasticsearch.index.analysis.AnalysisRegistry;
28+
import org.elasticsearch.persistent.PersistentTasksCustomMetaData.PersistentTask;
2829
import org.elasticsearch.rest.RestStatus;
2930
import org.elasticsearch.threadpool.ThreadPool;
31+
import org.elasticsearch.xpack.core.ml.action.GetFiltersAction;
3032
import org.elasticsearch.xpack.core.ml.action.util.QueryPage;
3133
import org.elasticsearch.xpack.core.ml.calendars.ScheduledEvent;
3234
import org.elasticsearch.xpack.core.ml.job.config.Job;
3335
import org.elasticsearch.xpack.core.ml.job.config.JobState;
3436
import org.elasticsearch.xpack.core.ml.job.config.JobTaskState;
35-
import org.elasticsearch.xpack.ml.job.persistence.ScheduledEventsQueryBuilder;
37+
import org.elasticsearch.xpack.core.ml.job.config.MlFilter;
3638
import org.elasticsearch.xpack.core.ml.job.process.autodetect.output.FlushAcknowledgement;
37-
import org.elasticsearch.xpack.ml.job.process.autodetect.params.AutodetectParams;
3839
import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.DataCounts;
3940
import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.ModelSizeStats;
4041
import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.ModelSnapshot;
4142
import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper;
42-
import org.elasticsearch.persistent.PersistentTasksCustomMetaData.PersistentTask;
4343
import org.elasticsearch.xpack.ml.MachineLearning;
4444
import org.elasticsearch.xpack.ml.action.TransportOpenJobAction.JobTask;
4545
import org.elasticsearch.xpack.ml.job.JobManager;
4646
import org.elasticsearch.xpack.ml.job.persistence.JobDataCountsPersister;
4747
import org.elasticsearch.xpack.ml.job.persistence.JobProvider;
4848
import org.elasticsearch.xpack.ml.job.persistence.JobRenormalizedResultsPersister;
4949
import org.elasticsearch.xpack.ml.job.persistence.JobResultsPersister;
50+
import org.elasticsearch.xpack.ml.job.persistence.ScheduledEventsQueryBuilder;
5051
import org.elasticsearch.xpack.ml.job.persistence.StateStreamer;
5152
import org.elasticsearch.xpack.ml.job.process.DataCountsReporter;
5253
import org.elasticsearch.xpack.ml.job.process.NativeStorageProvider;
5354
import org.elasticsearch.xpack.ml.job.process.autodetect.output.AutoDetectResultProcessor;
55+
import org.elasticsearch.xpack.ml.job.process.autodetect.params.AutodetectParams;
5456
import org.elasticsearch.xpack.ml.job.process.autodetect.params.DataLoadParams;
5557
import org.elasticsearch.xpack.ml.job.process.autodetect.params.FlushJobParams;
5658
import org.elasticsearch.xpack.ml.job.process.autodetect.params.ForecastParams;
@@ -82,6 +84,8 @@
8284
import java.util.function.Consumer;
8385

8486
import static org.elasticsearch.common.settings.Setting.Property;
87+
import static org.elasticsearch.xpack.core.ClientHelper.ML_ORIGIN;
88+
import static org.elasticsearch.xpack.core.ClientHelper.executeAsyncWithOrigin;
8589

8690
public class AutodetectProcessManager extends AbstractComponent {
8791

@@ -156,7 +160,7 @@ public void onNodeStartup() {
156160
}
157161
}
158162

159-
public synchronized void closeAllJobsOnThisNode(String reason) throws IOException {
163+
public synchronized void closeAllJobsOnThisNode(String reason) {
160164
int numJobs = processByAllocation.size();
161165
if (numJobs != 0) {
162166
logger.info("Closing [{}] jobs, because [{}]", numJobs, reason);
@@ -322,8 +326,7 @@ public void forecastJob(JobTask jobTask, ForecastParams params, Consumer<Excepti
322326
});
323327
}
324328

325-
public void writeUpdateProcessMessage(JobTask jobTask, UpdateParams updateParams,
326-
Consumer<Exception> handler) {
329+
public void writeUpdateProcessMessage(JobTask jobTask, UpdateParams updateParams, Consumer<Exception> handler) {
327330
AutodetectCommunicator communicator = getOpenAutodetectCommunicator(jobTask);
328331
if (communicator == null) {
329332
String message = "Cannot process update model debug config because job [" + jobTask.getJobId() + "] is not open";
@@ -332,25 +335,58 @@ public void writeUpdateProcessMessage(JobTask jobTask, UpdateParams updateParams
332335
return;
333336
}
334337

338+
UpdateProcessMessage.Builder updateProcessMessage = new UpdateProcessMessage.Builder();
339+
updateProcessMessage.setModelPlotConfig(updateParams.getModelPlotConfig());
340+
updateProcessMessage.setDetectorUpdates(updateParams.getDetectorUpdates());
341+
342+
// Step 3. Set scheduled events on message and write update process message
335343
ActionListener<QueryPage<ScheduledEvent>> eventsListener = ActionListener.wrap(
336344
events -> {
337-
communicator.writeUpdateProcessMessage(updateParams, events == null ? null : events.results(), (aVoid, e) -> {
345+
updateProcessMessage.setScheduledEvents(events == null ? null : events.results());
346+
communicator.writeUpdateProcessMessage(updateProcessMessage.build(), (aVoid, e) -> {
338347
if (e == null) {
339348
handler.accept(null);
340349
} else {
341350
handler.accept(e);
342351
}
343352
});
344-
},
345-
handler::accept);
346-
347-
if (updateParams.isUpdateScheduledEvents()) {
348-
Job job = jobManager.getJobOrThrowIfUnknown(jobTask.getJobId());
349-
DataCounts dataCounts = getStatistics(jobTask).get().v1();
350-
ScheduledEventsQueryBuilder query = new ScheduledEventsQueryBuilder().start(job.earliestValidTimestamp(dataCounts));
351-
jobProvider.scheduledEventsForJob(jobTask.getJobId(), job.getGroups(), query, eventsListener);
353+
}, handler
354+
);
355+
356+
// Step 2. Set the filter on the message and get scheduled events
357+
ActionListener<MlFilter> filterListener = ActionListener.wrap(
358+
filter -> {
359+
updateProcessMessage.setFilter(filter);
360+
361+
if (updateParams.isUpdateScheduledEvents()) {
362+
Job job = jobManager.getJobOrThrowIfUnknown(jobTask.getJobId());
363+
DataCounts dataCounts = getStatistics(jobTask).get().v1();
364+
ScheduledEventsQueryBuilder query = new ScheduledEventsQueryBuilder().start(job.earliestValidTimestamp(dataCounts));
365+
jobProvider.scheduledEventsForJob(jobTask.getJobId(), job.getGroups(), query, eventsListener);
366+
} else {
367+
eventsListener.onResponse(null);
368+
}
369+
}, handler
370+
);
371+
372+
// Step 1. Get the filter
373+
if (updateParams.getFilter() == null) {
374+
filterListener.onResponse(null);
352375
} else {
353-
eventsListener.onResponse(null);
376+
GetFiltersAction.Request getFilterRequest = new GetFiltersAction.Request();
377+
getFilterRequest.setFilterId(updateParams.getFilter().getId());
378+
executeAsyncWithOrigin(client, ML_ORIGIN, GetFiltersAction.INSTANCE, getFilterRequest, new ActionListener<GetFiltersAction.Response>() {
379+
380+
@Override
381+
public void onResponse(GetFiltersAction.Response response) {
382+
filterListener.onResponse(response.getFilters().results().get(0));
383+
}
384+
385+
@Override
386+
public void onFailure(Exception e) {
387+
handler.accept(e);
388+
}
389+
});
354390
}
355391
}
356392

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/UpdateParams.java

+9
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,15 @@ public MlFilter getFilter() {
4949
return filter;
5050
}
5151

52+
/**
53+
* Returns true if the update params include a job update,
54+
* ie an update to the job config directly rather than an
55+
* update to external resources a job uses (e.g. calendars, filters).
56+
*/
57+
public boolean isJobUpdate() {
58+
return modelPlotConfig != null || detectorUpdates != null;
59+
}
60+
5261
public boolean isUpdateScheduledEvents() {
5362
return updateScheduledEvents;
5463
}

0 commit comments

Comments
 (0)