Skip to content

Commit c7a1ce2

Browse files
[ML] Fix calendar and filter updates from non-master nodes (#31804)
Job updates or changes to calendars or filters may result into updating the job process if it has been running. To preserve the order of updates, process updates are queued through the UpdateJobProcessNotifier which is only running on the master node. All actions performing such updates must run on the master node. However, the CRUD actions for calendars and filters are not master node actions. They have been submitting the updates to the UpdateJobProcessNotifier even though it might have not been running (given the action was run on a non-master node). When that happens, the update never reaches the process. This commit fixes this problem by ensuring the notifier runs on all nodes and by ensuring the process update action gets the resources again before updating the process (instead of having those resources passed in the request). This ensures that even if the order of the updates gets messed up, the latest update will read the latest state of those resource and the process will get back in sync. This leaves us with 2 types of updates: 1. updates to the job config should happen on the master node. This is because we cannot refetch the entire job and update it. We need to know the parts that have been changed. 2. updates to resources the job uses. Those can be handled on non-master nodes but they should be re-fetched by the update process action. Closes #31803
1 parent 46582eb commit c7a1ce2

File tree

7 files changed

+212
-62
lines changed

7 files changed

+212
-62
lines changed

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/UpdateJobProcessNotifier.java

+36-19
Original file line numberDiff line numberDiff line change
@@ -5,14 +5,15 @@
55
*/
66
package org.elasticsearch.xpack.ml.job;
77

8+
import org.apache.logging.log4j.Logger;
89
import org.elasticsearch.ElasticsearchStatusException;
910
import org.elasticsearch.ResourceNotFoundException;
1011
import org.elasticsearch.action.ActionListener;
1112
import org.elasticsearch.client.Client;
12-
import org.elasticsearch.cluster.LocalNodeMasterListener;
1313
import org.elasticsearch.cluster.service.ClusterService;
1414
import org.elasticsearch.common.component.AbstractComponent;
1515
import org.elasticsearch.common.component.LifecycleListener;
16+
import org.elasticsearch.common.logging.Loggers;
1617
import org.elasticsearch.common.settings.Settings;
1718
import org.elasticsearch.common.unit.TimeValue;
1819
import org.elasticsearch.threadpool.ThreadPool;
@@ -31,9 +32,26 @@
3132
import static org.elasticsearch.xpack.core.ml.action.UpdateProcessAction.Request;
3233
import static org.elasticsearch.xpack.core.ml.action.UpdateProcessAction.Response;
3334

34-
public class UpdateJobProcessNotifier extends AbstractComponent implements LocalNodeMasterListener {
35+
/**
36+
* This class serves as a queue for updates to the job process.
37+
* Queueing is important for 2 reasons: first, it throttles the updates
38+
* to the process, and second and most important, it preserves the order of the updates
39+
* for actions that run on the master node. For preserving the order of the updates
40+
* to the job config, it's necessary to handle the whole update chain on the master
41+
* node. However, for updates to resources the job uses (e.g. calendars, filters),
42+
* they can be handled on non-master nodes as long as the update process action
43+
* is fetching the latest version of those resources from the index instead of
44+
* using the version that existed while the handling action was at work. This makes
45+
* sure that even if the order of updates gets reversed, the final process update
46+
* will fetch the valid state of those external resources ensuring the process is
47+
* in sync.
48+
*/
49+
public class UpdateJobProcessNotifier extends AbstractComponent {
50+
51+
private static final Logger LOGGER = Loggers.getLogger(UpdateJobProcessNotifier.class);
3552

3653
private final Client client;
54+
private final ClusterService clusterService;
3755
private final ThreadPool threadPool;
3856
private final LinkedBlockingQueue<UpdateHolder> orderedJobUpdates = new LinkedBlockingQueue<>(1000);
3957

@@ -42,9 +60,15 @@ public class UpdateJobProcessNotifier extends AbstractComponent implements Local
4260
public UpdateJobProcessNotifier(Settings settings, Client client, ClusterService clusterService, ThreadPool threadPool) {
4361
super(settings);
4462
this.client = client;
63+
this.clusterService = clusterService;
4564
this.threadPool = threadPool;
46-
clusterService.addLocalNodeMasterListener(this);
4765
clusterService.addLifecycleListener(new LifecycleListener() {
66+
67+
@Override
68+
public void beforeStart() {
69+
start();
70+
}
71+
4872
@Override
4973
public void beforeStop() {
5074
stop();
@@ -56,16 +80,6 @@ boolean submitJobUpdate(UpdateParams update, ActionListener<Boolean> listener) {
5680
return orderedJobUpdates.offer(new UpdateHolder(update, listener));
5781
}
5882

59-
@Override
60-
public void onMaster() {
61-
start();
62-
}
63-
64-
@Override
65-
public void offMaster() {
66-
stop();
67-
}
68-
6983
private void start() {
7084
cancellable = threadPool.scheduleWithFixedDelay(this::processNextUpdate, TimeValue.timeValueSeconds(1), ThreadPool.Names.GENERIC);
7185
}
@@ -79,12 +93,6 @@ private void stop() {
7993
}
8094
}
8195

82-
@Override
83-
public String executorName() {
84-
// SAME is ok here, because both start() and stop() are inexpensive:
85-
return ThreadPool.Names.SAME;
86-
}
87-
8896
private void processNextUpdate() {
8997
List<UpdateHolder> updates = new ArrayList<>(orderedJobUpdates.size());
9098
try {
@@ -101,6 +109,15 @@ void executeProcessUpdates(Iterator<UpdateHolder> updatesIterator) {
101109
}
102110
UpdateHolder updateHolder = updatesIterator.next();
103111
UpdateParams update = updateHolder.update;
112+
113+
if (update.isJobUpdate() && clusterService.localNode().isMasterNode() == false) {
114+
assert clusterService.localNode().isMasterNode();
115+
LOGGER.error("Job update was submitted to non-master node [" + clusterService.nodeName() + "]; update for job ["
116+
+ update.getJobId() + "] will be ignored");
117+
executeProcessUpdates(updatesIterator);
118+
return;
119+
}
120+
104121
Request request = new Request(update.getJobId(), update.getModelPlotConfig(), update.getDetectorUpdates(), update.getFilter(),
105122
update.isUpdateScheduledEvents());
106123

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectCommunicator.java

+18-21
Original file line numberDiff line numberDiff line change
@@ -16,36 +16,34 @@
1616
import org.elasticsearch.common.xcontent.XContentType;
1717
import org.elasticsearch.env.Environment;
1818
import org.elasticsearch.index.analysis.AnalysisRegistry;
19-
import org.elasticsearch.xpack.ml.MachineLearning;
20-
import org.elasticsearch.xpack.core.ml.calendars.ScheduledEvent;
21-
import org.elasticsearch.xpack.ml.job.categorization.CategorizationAnalyzer;
2219
import org.elasticsearch.xpack.core.ml.job.config.AnalysisConfig;
2320
import org.elasticsearch.xpack.core.ml.job.config.CategorizationAnalyzerConfig;
2421
import org.elasticsearch.xpack.core.ml.job.config.DataDescription;
2522
import org.elasticsearch.xpack.core.ml.job.config.Job;
2623
import org.elasticsearch.xpack.core.ml.job.config.JobUpdate;
24+
import org.elasticsearch.xpack.core.ml.job.process.autodetect.output.FlushAcknowledgement;
25+
import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.DataCounts;
26+
import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.ModelSizeStats;
27+
import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.ModelSnapshot;
28+
import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper;
29+
import org.elasticsearch.xpack.ml.MachineLearning;
30+
import org.elasticsearch.xpack.ml.job.categorization.CategorizationAnalyzer;
2731
import org.elasticsearch.xpack.ml.job.persistence.StateStreamer;
2832
import org.elasticsearch.xpack.ml.job.process.CountingInputStream;
2933
import org.elasticsearch.xpack.ml.job.process.DataCountsReporter;
3034
import org.elasticsearch.xpack.ml.job.process.autodetect.output.AutoDetectResultProcessor;
31-
import org.elasticsearch.xpack.core.ml.job.process.autodetect.output.FlushAcknowledgement;
3235
import org.elasticsearch.xpack.ml.job.process.autodetect.params.DataLoadParams;
3336
import org.elasticsearch.xpack.ml.job.process.autodetect.params.FlushJobParams;
3437
import org.elasticsearch.xpack.ml.job.process.autodetect.params.ForecastParams;
35-
import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.DataCounts;
36-
import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.ModelSizeStats;
37-
import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.ModelSnapshot;
3838
import org.elasticsearch.xpack.ml.job.process.autodetect.writer.DataToProcessWriter;
3939
import org.elasticsearch.xpack.ml.job.process.autodetect.writer.DataToProcessWriterFactory;
40-
import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper;
4140

4241
import java.io.Closeable;
4342
import java.io.IOException;
4443
import java.io.InputStream;
4544
import java.time.Duration;
4645
import java.time.ZonedDateTime;
4746
import java.util.Collections;
48-
import java.util.List;
4947
import java.util.Locale;
5048
import java.util.Optional;
5149
import java.util.concurrent.CountDownLatch;
@@ -205,30 +203,29 @@ public void killProcess(boolean awaitCompletion, boolean finish) throws IOExcept
205203
}
206204
}
207205

208-
public void writeUpdateProcessMessage(UpdateParams updateParams, List<ScheduledEvent> scheduledEvents,
209-
BiConsumer<Void, Exception> handler) {
206+
public void writeUpdateProcessMessage(UpdateProcessMessage update, BiConsumer<Void, Exception> handler) {
210207
submitOperation(() -> {
211-
if (updateParams.getModelPlotConfig() != null) {
212-
autodetectProcess.writeUpdateModelPlotMessage(updateParams.getModelPlotConfig());
208+
if (update.getModelPlotConfig() != null) {
209+
autodetectProcess.writeUpdateModelPlotMessage(update.getModelPlotConfig());
213210
}
214211

215212
// Filters have to be written before detectors
216-
if (updateParams.getFilter() != null) {
217-
autodetectProcess.writeUpdateFiltersMessage(Collections.singletonList(updateParams.getFilter()));
213+
if (update.getFilter() != null) {
214+
autodetectProcess.writeUpdateFiltersMessage(Collections.singletonList(update.getFilter()));
218215
}
219216

220217
// Add detector rules
221-
if (updateParams.getDetectorUpdates() != null) {
222-
for (JobUpdate.DetectorUpdate update : updateParams.getDetectorUpdates()) {
223-
if (update.getRules() != null) {
224-
autodetectProcess.writeUpdateDetectorRulesMessage(update.getDetectorIndex(), update.getRules());
218+
if (update.getDetectorUpdates() != null) {
219+
for (JobUpdate.DetectorUpdate detectorUpdate : update.getDetectorUpdates()) {
220+
if (detectorUpdate.getRules() != null) {
221+
autodetectProcess.writeUpdateDetectorRulesMessage(detectorUpdate.getDetectorIndex(), detectorUpdate.getRules());
225222
}
226223
}
227224
}
228225

229226
// Add scheduled events; null means there's no update but an empty list means we should clear any events in the process
230-
if (scheduledEvents != null) {
231-
autodetectProcess.writeUpdateScheduledEventsMessage(scheduledEvents, job.getAnalysisConfig().getBucketSpan());
227+
if (update.getScheduledEvents() != null) {
228+
autodetectProcess.writeUpdateScheduledEventsMessage(update.getScheduledEvents(), job.getAnalysisConfig().getBucketSpan());
232229
}
233230

234231
return null;

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManager.java

+55-19
Original file line numberDiff line numberDiff line change
@@ -5,8 +5,6 @@
55
*/
66
package org.elasticsearch.xpack.ml.job.process.autodetect;
77

8-
import org.elasticsearch.common.xcontent.XContentElasticsearchExtension;
9-
import org.elasticsearch.core.internal.io.IOUtils;
108
import org.elasticsearch.ElasticsearchStatusException;
119
import org.elasticsearch.action.ActionListener;
1210
import org.elasticsearch.client.Client;
@@ -19,35 +17,38 @@
1917
import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
2018
import org.elasticsearch.common.util.concurrent.ThreadContext;
2119
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
22-
import org.elasticsearch.common.xcontent.XContentBuilder;
20+
import org.elasticsearch.common.xcontent.XContentElasticsearchExtension;
2321
import org.elasticsearch.common.xcontent.XContentType;
22+
import org.elasticsearch.core.internal.io.IOUtils;
2423
import org.elasticsearch.env.Environment;
2524
import org.elasticsearch.index.analysis.AnalysisRegistry;
25+
import org.elasticsearch.persistent.PersistentTasksCustomMetaData.PersistentTask;
2626
import org.elasticsearch.rest.RestStatus;
2727
import org.elasticsearch.threadpool.ThreadPool;
28+
import org.elasticsearch.xpack.core.ml.action.GetFiltersAction;
2829
import org.elasticsearch.xpack.core.ml.action.util.QueryPage;
2930
import org.elasticsearch.xpack.core.ml.calendars.ScheduledEvent;
3031
import org.elasticsearch.xpack.core.ml.job.config.Job;
3132
import org.elasticsearch.xpack.core.ml.job.config.JobState;
3233
import org.elasticsearch.xpack.core.ml.job.config.JobTaskStatus;
33-
import org.elasticsearch.xpack.ml.job.persistence.ScheduledEventsQueryBuilder;
34+
import org.elasticsearch.xpack.core.ml.job.config.MlFilter;
3435
import org.elasticsearch.xpack.core.ml.job.process.autodetect.output.FlushAcknowledgement;
35-
import org.elasticsearch.xpack.ml.job.process.autodetect.params.AutodetectParams;
3636
import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.DataCounts;
3737
import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.ModelSizeStats;
3838
import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.ModelSnapshot;
3939
import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper;
40-
import org.elasticsearch.persistent.PersistentTasksCustomMetaData.PersistentTask;
4140
import org.elasticsearch.xpack.ml.MachineLearning;
4241
import org.elasticsearch.xpack.ml.action.TransportOpenJobAction.JobTask;
4342
import org.elasticsearch.xpack.ml.job.JobManager;
4443
import org.elasticsearch.xpack.ml.job.persistence.JobDataCountsPersister;
4544
import org.elasticsearch.xpack.ml.job.persistence.JobProvider;
4645
import org.elasticsearch.xpack.ml.job.persistence.JobRenormalizedResultsPersister;
4746
import org.elasticsearch.xpack.ml.job.persistence.JobResultsPersister;
47+
import org.elasticsearch.xpack.ml.job.persistence.ScheduledEventsQueryBuilder;
4848
import org.elasticsearch.xpack.ml.job.persistence.StateStreamer;
4949
import org.elasticsearch.xpack.ml.job.process.DataCountsReporter;
5050
import org.elasticsearch.xpack.ml.job.process.autodetect.output.AutoDetectResultProcessor;
51+
import org.elasticsearch.xpack.ml.job.process.autodetect.params.AutodetectParams;
5152
import org.elasticsearch.xpack.ml.job.process.autodetect.params.DataLoadParams;
5253
import org.elasticsearch.xpack.ml.job.process.autodetect.params.FlushJobParams;
5354
import org.elasticsearch.xpack.ml.job.process.autodetect.params.ForecastParams;
@@ -78,6 +79,8 @@
7879
import java.util.function.Consumer;
7980

8081
import static org.elasticsearch.common.settings.Setting.Property;
82+
import static org.elasticsearch.xpack.core.ClientHelper.ML_ORIGIN;
83+
import static org.elasticsearch.xpack.core.ClientHelper.executeAsyncWithOrigin;
8184

8285
public class AutodetectProcessManager extends AbstractComponent {
8386

@@ -135,7 +138,7 @@ public AutodetectProcessManager(Environment environment, Settings settings, Clie
135138
this.auditor = auditor;
136139
}
137140

138-
public synchronized void closeAllJobsOnThisNode(String reason) throws IOException {
141+
public synchronized void closeAllJobsOnThisNode(String reason) {
139142
int numJobs = processByAllocation.size();
140143
if (numJobs != 0) {
141144
logger.info("Closing [{}] jobs, because [{}]", numJobs, reason);
@@ -278,8 +281,7 @@ public void forecastJob(JobTask jobTask, ForecastParams params, Consumer<Excepti
278281
});
279282
}
280283

281-
public void writeUpdateProcessMessage(JobTask jobTask, UpdateParams updateParams,
282-
Consumer<Exception> handler) {
284+
public void writeUpdateProcessMessage(JobTask jobTask, UpdateParams updateParams, Consumer<Exception> handler) {
283285
AutodetectCommunicator communicator = getOpenAutodetectCommunicator(jobTask);
284286
if (communicator == null) {
285287
String message = "Cannot process update model debug config because job [" + jobTask.getJobId() + "] is not open";
@@ -288,25 +290,59 @@ public void writeUpdateProcessMessage(JobTask jobTask, UpdateParams updateParams
288290
return;
289291
}
290292

293+
UpdateProcessMessage.Builder updateProcessMessage = new UpdateProcessMessage.Builder();
294+
updateProcessMessage.setModelPlotConfig(updateParams.getModelPlotConfig());
295+
updateProcessMessage.setDetectorUpdates(updateParams.getDetectorUpdates());
296+
297+
// Step 3. Set scheduled events on message and write update process message
291298
ActionListener<QueryPage<ScheduledEvent>> eventsListener = ActionListener.wrap(
292299
events -> {
293-
communicator.writeUpdateProcessMessage(updateParams, events == null ? null : events.results(), (aVoid, e) -> {
300+
updateProcessMessage.setScheduledEvents(events == null ? null : events.results());
301+
communicator.writeUpdateProcessMessage(updateProcessMessage.build(), (aVoid, e) -> {
294302
if (e == null) {
295303
handler.accept(null);
296304
} else {
297305
handler.accept(e);
298306
}
299307
});
300-
},
301-
handler::accept);
302-
303-
if (updateParams.isUpdateScheduledEvents()) {
304-
Job job = jobManager.getJobOrThrowIfUnknown(jobTask.getJobId());
305-
DataCounts dataCounts = getStatistics(jobTask).get().v1();
306-
ScheduledEventsQueryBuilder query = new ScheduledEventsQueryBuilder().start(job.earliestValidTimestamp(dataCounts));
307-
jobProvider.scheduledEventsForJob(jobTask.getJobId(), job.getGroups(), query, eventsListener);
308+
}, handler
309+
);
310+
311+
// Step 2. Set the filter on the message and get scheduled events
312+
ActionListener<MlFilter> filterListener = ActionListener.wrap(
313+
filter -> {
314+
updateProcessMessage.setFilter(filter);
315+
316+
if (updateParams.isUpdateScheduledEvents()) {
317+
Job job = jobManager.getJobOrThrowIfUnknown(jobTask.getJobId());
318+
DataCounts dataCounts = getStatistics(jobTask).get().v1();
319+
ScheduledEventsQueryBuilder query = new ScheduledEventsQueryBuilder().start(job.earliestValidTimestamp(dataCounts));
320+
jobProvider.scheduledEventsForJob(jobTask.getJobId(), job.getGroups(), query, eventsListener);
321+
} else {
322+
eventsListener.onResponse(null);
323+
}
324+
}, handler
325+
);
326+
327+
// Step 1. Get the filter
328+
if (updateParams.getFilter() == null) {
329+
filterListener.onResponse(null);
308330
} else {
309-
eventsListener.onResponse(null);
331+
GetFiltersAction.Request getFilterRequest = new GetFiltersAction.Request();
332+
getFilterRequest.setFilterId(updateParams.getFilter().getId());
333+
executeAsyncWithOrigin(client, ML_ORIGIN, GetFiltersAction.INSTANCE, getFilterRequest,
334+
new ActionListener<GetFiltersAction.Response>() {
335+
336+
@Override
337+
public void onResponse(GetFiltersAction.Response response) {
338+
filterListener.onResponse(response.getFilters().results().get(0));
339+
}
340+
341+
@Override
342+
public void onFailure(Exception e) {
343+
handler.accept(e);
344+
}
345+
});
310346
}
311347
}
312348

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/UpdateParams.java

+9
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,15 @@ public MlFilter getFilter() {
4949
return filter;
5050
}
5151

52+
/**
53+
* Returns true if the update params include a job update,
54+
* ie an update to the job config directly rather than an
55+
* update to external resources a job uses (e.g. calendars, filters).
56+
*/
57+
public boolean isJobUpdate() {
58+
return modelPlotConfig != null || detectorUpdates != null;
59+
}
60+
5261
public boolean isUpdateScheduledEvents() {
5362
return updateScheduledEvents;
5463
}

0 commit comments

Comments
 (0)