Skip to content

Commit e4ff839

Browse files
authored
[ML] Skip renormalization after node shutdown API called (#89347)
A node can be informed that it is about to be shut down using the node shutdown API. When this happens we gracefully stop the jobs on the node and they persist state. This state persistence includes latest quantiles, and usually receipt of new quantiles triggers a renormalization. However, in the case of an impending node shutdown we do not want to be kicking off new processes that may delay the shutdown. This PR changes the anomaly detection job results processor so that it will not trigger a renormalization based on quantiles received after a node shutdown message is received.
1 parent 00d4953 commit e4ff839

File tree

5 files changed

+50
-1
lines changed

5 files changed

+50
-1
lines changed

docs/changelog/89347.yaml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
pr: 89347
2+
summary: Skip renormalization after node shutdown API called
3+
area: Machine Learning
4+
type: enhancement
5+
issues: []

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectCommunicator.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -414,4 +414,8 @@ private void createCategorizationAnalyzer(AnalysisRegistry analysisRegistry) thr
414414
}
415415
categorizationAnalyzer = new CategorizationAnalyzer(analysisRegistry, categorizationAnalyzerConfig);
416416
}
417+
418+
public void setVacating(boolean vacating) {
419+
autodetectResultProcessor.setVacating(vacating);
420+
}
417421
}

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManager.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -901,6 +901,7 @@ private void closeProcessAndTask(ProcessContext processContext, JobTask jobTask,
901901
if (jobKilled) {
902902
communicator.killProcess(true, false, false);
903903
} else {
904+
communicator.setVacating(jobTask.isVacating());
904905
// communicator.close() may take a long time to run, if the job persists a large model state as a
905906
// result of calling it. We want to leave open the option to kill the job during this time, which
906907
// is why the allocation ID must remain in the map until after the close is complete.

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/output/AutodetectResultProcessor.java

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -95,6 +95,7 @@ public class AutodetectResultProcessor {
9595
final Semaphore updateModelSnapshotSemaphore = new Semaphore(1);
9696
private final FlushListener flushListener;
9797
private volatile boolean processKilled;
98+
private volatile boolean vacating;
9899
private volatile boolean failed;
99100
private final Map<String, ForecastRequestStats> runningForecasts;
100101
private final long priorRunsBucketCount;
@@ -233,6 +234,7 @@ private void readResults() {
233234

234235
public void setProcessKilled() {
235236
processKilled = true;
237+
vacating = false;
236238
try {
237239
renormalizer.shutdown();
238240
} catch (InterruptedException e) {
@@ -241,6 +243,10 @@ public void setProcessKilled() {
241243
}
242244
}
243245

246+
public void setVacating(boolean vacating) {
247+
this.vacating = vacating;
248+
}
249+
244250
void handleOpenForecasts() {
245251
try {
246252
if (runningForecasts.isEmpty() == false) {
@@ -360,7 +366,8 @@ void processResult(AutodetectResult result) {
360366
persister.persistQuantiles(quantiles, this::isAlive);
361367
bulkResultsPersister.executeRequest();
362368

363-
if (processKilled == false && renormalizer.isEnabled()) {
369+
// If a node is trying to shut down then don't trigger any further normalizations on the node
370+
if (vacating == false && processKilled == false && renormalizer.isEnabled()) {
364371
// We need to make all results written up to these quantiles available for renormalization
365372
persister.commitResultWrites(jobId);
366373
LOGGER.debug("[{}] Quantiles queued for renormalization", jobId);

x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManagerTests.java

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -420,6 +420,38 @@ public void testCloseJob() {
420420
assertEquals(1, manager.numberOfOpenJobs());
421421
manager.closeJob(jobTask, null);
422422
assertEquals(0, manager.numberOfOpenJobs());
423+
verify(autodetectCommunicator).setVacating(false);
424+
}
425+
426+
public void testVacate() {
427+
ExecutorService executorService = mock(ExecutorService.class);
428+
doAnswer(invocationOnMock -> {
429+
((Runnable) invocationOnMock.getArguments()[0]).run();
430+
return null;
431+
}).when(executorService).execute(any(Runnable.class));
432+
when(threadPool.executor(anyString())).thenReturn(executorService);
433+
AutodetectProcessManager manager = createSpyManager();
434+
assertEquals(0, manager.numberOfOpenJobs());
435+
436+
JobTask jobTask = mock(JobTask.class);
437+
when(jobTask.getJobId()).thenReturn("foo");
438+
when(jobTask.triggerVacate()).thenReturn(true);
439+
manager.openJob(jobTask, clusterState, DEFAULT_MASTER_NODE_TIMEOUT, (e, b) -> {});
440+
manager.processData(
441+
jobTask,
442+
analysisRegistry,
443+
createInputStream(""),
444+
randomFrom(XContentType.values()),
445+
mock(DataLoadParams.class),
446+
(dataCounts1, e) -> {}
447+
);
448+
449+
// job is created
450+
assertEquals(1, manager.numberOfOpenJobs());
451+
when(jobTask.isVacating()).thenReturn(true);
452+
manager.vacateOpenJobsOnThisNode();
453+
assertEquals(0, manager.numberOfOpenJobs());
454+
verify(autodetectCommunicator).setVacating(true);
423455
}
424456

425457
public void testCanCloseClosingJob() throws Exception {

0 commit comments

Comments
 (0)