Skip to content

Commit 86b01f5

Browse files
committed
[ML] Wait for autodetect to be ready in the datafeed (#37349)
This is a reinforcement of #37227. It turns out that persistent tasks are not made stale if the node they were running on is restarted and the master node does not notice this. The main scenario where this happens is when minimum master nodes is the same as the number of nodes in the cluster, so the cluster cannot elect a master node when any node is restarted. When an ML node restarts we need the datafeeds for any jobs that were running on that node to not just wait until the jobs are allocated, but to wait for the autodetect process of the job to start up. In the case of reassignment of the job persistent task this was dealt with by the stale status test. But in the case where a node restarts but its persistent tasks are not reassigned we need a deeper test. Fixes #36810
1 parent aa6cec7 commit 86b01f5

File tree

5 files changed

+98
-18
lines changed

5 files changed

+98
-18
lines changed

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -429,7 +429,7 @@ public Collection<Object> createComponents(Client client, ClusterService cluster
429429
DatafeedJobBuilder datafeedJobBuilder = new DatafeedJobBuilder(client, settings, xContentRegistry,
430430
auditor, System::currentTimeMillis);
431431
DatafeedManager datafeedManager = new DatafeedManager(threadPool, client, clusterService, datafeedJobBuilder,
432-
System::currentTimeMillis, auditor);
432+
System::currentTimeMillis, auditor, autodetectProcessManager);
433433
this.datafeedManager.set(datafeedManager);
434434
MlLifeCycleService mlLifeCycleService = new MlLifeCycleService(environment, clusterService, datafeedManager,
435435
autodetectProcessManager);

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedManager.java

Lines changed: 24 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -27,9 +27,11 @@
2727
import org.elasticsearch.xpack.core.ml.action.StartDatafeedAction;
2828
import org.elasticsearch.xpack.core.ml.datafeed.DatafeedState;
2929
import org.elasticsearch.xpack.core.ml.job.config.JobState;
30+
import org.elasticsearch.xpack.core.ml.job.config.JobTaskState;
3031
import org.elasticsearch.xpack.core.ml.job.messages.Messages;
3132
import org.elasticsearch.xpack.ml.MachineLearning;
3233
import org.elasticsearch.xpack.ml.action.TransportStartDatafeedAction;
34+
import org.elasticsearch.xpack.ml.job.process.autodetect.AutodetectProcessManager;
3335
import org.elasticsearch.xpack.ml.notifications.Auditor;
3436

3537
import java.util.ArrayList;
@@ -62,16 +64,18 @@ public class DatafeedManager {
6264
private final ConcurrentMap<Long, Holder> runningDatafeedsOnThisNode = new ConcurrentHashMap<>();
6365
private final DatafeedJobBuilder datafeedJobBuilder;
6466
private final TaskRunner taskRunner = new TaskRunner();
67+
private final AutodetectProcessManager autodetectProcessManager;
6568
private volatile boolean isolated;
6669

6770
public DatafeedManager(ThreadPool threadPool, Client client, ClusterService clusterService, DatafeedJobBuilder datafeedJobBuilder,
68-
Supplier<Long> currentTimeSupplier, Auditor auditor) {
71+
Supplier<Long> currentTimeSupplier, Auditor auditor, AutodetectProcessManager autodetectProcessManager) {
6972
this.client = Objects.requireNonNull(client);
7073
this.clusterService = Objects.requireNonNull(clusterService);
7174
this.threadPool = threadPool;
7275
this.currentTimeSupplier = Objects.requireNonNull(currentTimeSupplier);
7376
this.auditor = Objects.requireNonNull(auditor);
7477
this.datafeedJobBuilder = Objects.requireNonNull(datafeedJobBuilder);
78+
this.autodetectProcessManager = autodetectProcessManager;
7579
clusterService.addListener(taskRunner);
7680
}
7781

@@ -256,6 +260,21 @@ private JobState getJobState(PersistentTasksCustomMetaData tasks, TransportStart
256260
return MlTasks.getJobStateModifiedForReassignments(getJobId(datafeedTask), tasks);
257261
}
258262

263+
private boolean jobHasOpenAutodetectCommunicator(PersistentTasksCustomMetaData tasks,
264+
TransportStartDatafeedAction.DatafeedTask datafeedTask) {
265+
PersistentTasksCustomMetaData.PersistentTask<?> jobTask = MlTasks.getJobTask(getJobId(datafeedTask), tasks);
266+
if (jobTask == null) {
267+
return false;
268+
}
269+
270+
JobTaskState state = (JobTaskState) jobTask.getState();
271+
if (state == null || state.isStatusStale(jobTask)) {
272+
return false;
273+
}
274+
275+
return autodetectProcessManager.hasOpenAutodetectCommunicator(jobTask.getAllocationId());
276+
}
277+
259278
private TimeValue computeNextDelay(long next) {
260279
return new TimeValue(Math.max(1, next - currentTimeSupplier.get()));
261280
}
@@ -446,7 +465,7 @@ private class TaskRunner implements ClusterStateListener {
446465
private void runWhenJobIsOpened(TransportStartDatafeedAction.DatafeedTask datafeedTask) {
447466
ClusterState clusterState = clusterService.state();
448467
PersistentTasksCustomMetaData tasks = clusterState.getMetaData().custom(PersistentTasksCustomMetaData.TYPE);
449-
if (getJobState(tasks, datafeedTask) == JobState.OPENED) {
468+
if (getJobState(tasks, datafeedTask) == JobState.OPENED && jobHasOpenAutodetectCommunicator(tasks, datafeedTask)) {
450469
runTask(datafeedTask);
451470
} else {
452471
logger.info("Datafeed [{}] is waiting for job [{}] to be opened",
@@ -485,10 +504,10 @@ public void clusterChanged(ClusterChangedEvent event) {
485504
continue;
486505
}
487506
JobState jobState = getJobState(currentTasks, datafeedTask);
488-
if (jobState == JobState.OPENED) {
489-
runTask(datafeedTask);
490-
} else if (jobState == JobState.OPENING) {
507+
if (jobState == JobState.OPENING || jobHasOpenAutodetectCommunicator(currentTasks, datafeedTask) == false) {
491508
remainingTasks.add(datafeedTask);
509+
} else if (jobState == JobState.OPENED) {
510+
runTask(datafeedTask);
492511
} else {
493512
logger.warn("Datafeed [{}] is stopping because job [{}] state is [{}]",
494513
datafeedTask.getDatafeedId(), getJobId(datafeedTask), jobState);

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManager.java

Lines changed: 23 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -215,6 +215,13 @@ public void killAllProcessesOnThisNode() {
215215
*/
216216
public void persistJob(JobTask jobTask, Consumer<Exception> handler) {
217217
AutodetectCommunicator communicator = getOpenAutodetectCommunicator(jobTask);
218+
if (communicator == null) {
219+
String message = String.format(Locale.ROOT, "Cannot persist because job [%s] does not have a corresponding autodetect process",
220+
jobTask.getJobId());
221+
logger.debug(message);
222+
handler.accept(ExceptionsHelper.conflictStatusException(message));
223+
return;
224+
}
218225
communicator.persistJob((aVoid, e) -> handler.accept(e));
219226
}
220227

@@ -242,7 +249,8 @@ public void processData(JobTask jobTask, AnalysisRegistry analysisRegistry, Inpu
242249
XContentType xContentType, DataLoadParams params, BiConsumer<DataCounts, Exception> handler) {
243250
AutodetectCommunicator communicator = getOpenAutodetectCommunicator(jobTask);
244251
if (communicator == null) {
245-
throw ExceptionsHelper.conflictStatusException("Cannot process data because job [" + jobTask.getJobId() + "] is not open");
252+
throw ExceptionsHelper.conflictStatusException("Cannot process data because job [" + jobTask.getJobId() +
253+
"] does not have a corresponding autodetect process");
246254
}
247255
communicator.writeToJob(input, analysisRegistry, xContentType, params, handler);
248256
}
@@ -260,7 +268,8 @@ public void flushJob(JobTask jobTask, FlushJobParams params, ActionListener<Flus
260268
logger.debug("Flushing job {}", jobTask.getJobId());
261269
AutodetectCommunicator communicator = getOpenAutodetectCommunicator(jobTask);
262270
if (communicator == null) {
263-
String message = String.format(Locale.ROOT, "Cannot flush because job [%s] is not open", jobTask.getJobId());
271+
String message = String.format(Locale.ROOT, "Cannot flush because job [%s] does not have a corresponding autodetect process",
272+
jobTask.getJobId());
264273
logger.debug(message);
265274
handler.onFailure(ExceptionsHelper.conflictStatusException(message));
266275
return;
@@ -310,7 +319,8 @@ public void forecastJob(JobTask jobTask, ForecastParams params, Consumer<Excepti
310319
logger.debug("Forecasting job {}", jobId);
311320
AutodetectCommunicator communicator = getOpenAutodetectCommunicator(jobTask);
312321
if (communicator == null) {
313-
String message = String.format(Locale.ROOT, "Cannot forecast because job [%s] is not open", jobId);
322+
String message = String.format(Locale.ROOT,
323+
"Cannot forecast because job [%s] does not have a corresponding autodetect process", jobId);
314324
logger.debug(message);
315325
handler.accept(ExceptionsHelper.conflictStatusException(message));
316326
return;
@@ -330,7 +340,8 @@ public void forecastJob(JobTask jobTask, ForecastParams params, Consumer<Excepti
330340
public void writeUpdateProcessMessage(JobTask jobTask, UpdateParams updateParams, Consumer<Exception> handler) {
331341
AutodetectCommunicator communicator = getOpenAutodetectCommunicator(jobTask);
332342
if (communicator == null) {
333-
String message = "Cannot process update model debug config because job [" + jobTask.getJobId() + "] is not open";
343+
String message = "Cannot process update model debug config because job [" + jobTask.getJobId() +
344+
"] does not have a corresponding autodetect process";
334345
logger.debug(message);
335346
handler.accept(ExceptionsHelper.conflictStatusException(message));
336347
return;
@@ -667,6 +678,14 @@ private AutodetectCommunicator getOpenAutodetectCommunicator(JobTask jobTask) {
667678
return null;
668679
}
669680

681+
public boolean hasOpenAutodetectCommunicator(long jobAllocationId) {
682+
ProcessContext processContext = processByAllocation.get(jobAllocationId);
683+
if (processContext != null && processContext.getState() == ProcessContext.ProcessStateName.RUNNING) {
684+
return processContext.getAutodetectCommunicator() != null;
685+
}
686+
return false;
687+
}
688+
670689
public Optional<Duration> jobOpenTime(JobTask jobTask) {
671690
AutodetectCommunicator communicator = getAutodetectCommunicator(jobTask);
672691
if (communicator == null) {

x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/datafeed/DatafeedManagerTests.java

Lines changed: 50 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@
3939
import org.elasticsearch.xpack.ml.action.TransportStartDatafeedAction.DatafeedTask;
4040
import org.elasticsearch.xpack.ml.action.TransportStartDatafeedActionTests;
4141
import org.elasticsearch.xpack.ml.job.persistence.MockClientBuilder;
42+
import org.elasticsearch.xpack.ml.job.process.autodetect.AutodetectProcessManager;
4243
import org.elasticsearch.xpack.ml.notifications.Auditor;
4344
import org.junit.Before;
4445
import org.mockito.ArgumentCaptor;
@@ -48,6 +49,7 @@
4849
import java.util.Date;
4950
import java.util.concurrent.ExecutorService;
5051
import java.util.concurrent.ScheduledFuture;
52+
import java.util.concurrent.atomic.AtomicBoolean;
5153
import java.util.function.Consumer;
5254

5355
import static org.elasticsearch.xpack.ml.action.TransportOpenJobActionTests.addJobTask;
@@ -74,13 +76,14 @@ public class DatafeedManagerTests extends ESTestCase {
7476
private long currentTime = 120000;
7577
private Auditor auditor;
7678
private ArgumentCaptor<ClusterStateListener> capturedClusterStateListener = ArgumentCaptor.forClass(ClusterStateListener.class);
79+
private AtomicBoolean hasOpenAutodetectCommunicator;
7780

7881
@Before
7982
@SuppressWarnings("unchecked")
8083
public void setUpTests() {
8184
Job.Builder job = createDatafeedJob().setCreateTime(new Date());
8285

83-
PersistentTasksCustomMetaData.Builder tasksBuilder = PersistentTasksCustomMetaData.builder();
86+
PersistentTasksCustomMetaData.Builder tasksBuilder = PersistentTasksCustomMetaData.builder();
8487
addJobTask(job.getId(), "node_id", JobState.OPENED, tasksBuilder);
8588
PersistentTasksCustomMetaData tasks = tasksBuilder.build();
8689
DiscoveryNodes nodes = DiscoveryNodes.builder()
@@ -128,7 +131,12 @@ public void setUpTests() {
128131
return null;
129132
}).when(datafeedJobBuilder).build(any(), any(), any());
130133

131-
datafeedManager = new DatafeedManager(threadPool, client, clusterService, datafeedJobBuilder, () -> currentTime, auditor);
134+
hasOpenAutodetectCommunicator = new AtomicBoolean(true);
135+
AutodetectProcessManager autodetectProcessManager = mock(AutodetectProcessManager.class);
136+
doAnswer(invocation -> hasOpenAutodetectCommunicator.get()).when(autodetectProcessManager).hasOpenAutodetectCommunicator(anyLong());
137+
138+
datafeedManager = new DatafeedManager(threadPool, client, clusterService, datafeedJobBuilder, () -> currentTime, auditor,
139+
autodetectProcessManager);
132140

133141
verify(clusterService).addListener(capturedClusterStateListener.capture());
134142
}
@@ -259,7 +267,7 @@ public void testDatafeedTaskWaitsUntilJobIsOpened() {
259267
// Verify datafeed has not started running yet as job is still opening
260268
verify(threadPool, never()).executor(MachineLearning.DATAFEED_THREAD_POOL_NAME);
261269

262-
tasksBuilder = PersistentTasksCustomMetaData.builder();
270+
tasksBuilder = PersistentTasksCustomMetaData.builder();
263271
addJobTask("job_id", "node_id", JobState.OPENING, tasksBuilder);
264272
addJobTask("another_job", "node_id", JobState.OPENED, tasksBuilder);
265273
ClusterState.Builder anotherJobCs = ClusterState.builder(clusterService.state())
@@ -270,15 +278,52 @@ public void testDatafeedTaskWaitsUntilJobIsOpened() {
270278
// Still no run
271279
verify(threadPool, never()).executor(MachineLearning.DATAFEED_THREAD_POOL_NAME);
272280

273-
tasksBuilder = PersistentTasksCustomMetaData.builder();
281+
tasksBuilder = PersistentTasksCustomMetaData.builder();
274282
addJobTask("job_id", "node_id", JobState.OPENED, tasksBuilder);
275283
ClusterState.Builder jobOpenedCs = ClusterState.builder(clusterService.state())
276284
.metaData(new MetaData.Builder().putCustom(PersistentTasksCustomMetaData.TYPE, tasksBuilder.build()));
277285

278286
capturedClusterStateListener.getValue().clusterChanged(
279287
new ClusterChangedEvent("_source", jobOpenedCs.build(), anotherJobCs.build()));
280288

281-
// Now it should run as the job state chanded to OPENED
289+
// Now it should run as the job state changed to OPENED
290+
verify(threadPool, times(1)).executor(MachineLearning.DATAFEED_THREAD_POOL_NAME);
291+
}
292+
293+
public void testDatafeedTaskWaitsUntilAutodetectCommunicatorIsOpen() {
294+
295+
hasOpenAutodetectCommunicator.set(false);
296+
297+
PersistentTasksCustomMetaData.Builder tasksBuilder = PersistentTasksCustomMetaData.builder();
298+
addJobTask("job_id", "node_id", JobState.OPENED, tasksBuilder);
299+
ClusterState.Builder cs = ClusterState.builder(clusterService.state())
300+
.metaData(new MetaData.Builder().putCustom(PersistentTasksCustomMetaData.TYPE, tasksBuilder.build()));
301+
when(clusterService.state()).thenReturn(cs.build());
302+
303+
Consumer<Exception> handler = mockConsumer();
304+
DatafeedTask task = createDatafeedTask("datafeed_id", 0L, 60000L);
305+
datafeedManager.run(task, handler);
306+
307+
// Verify datafeed has not started running yet as job doesn't have an open autodetect communicator
308+
verify(threadPool, never()).executor(MachineLearning.DATAFEED_THREAD_POOL_NAME);
309+
310+
tasksBuilder = PersistentTasksCustomMetaData.builder();
311+
addJobTask("job_id", "node_id", JobState.OPENED, tasksBuilder);
312+
addJobTask("another_job", "node_id", JobState.OPENED, tasksBuilder);
313+
ClusterState.Builder anotherJobCs = ClusterState.builder(clusterService.state())
314+
.metaData(new MetaData.Builder().putCustom(PersistentTasksCustomMetaData.TYPE, tasksBuilder.build()));
315+
316+
capturedClusterStateListener.getValue().clusterChanged(new ClusterChangedEvent("_source", anotherJobCs.build(), cs.build()));
317+
318+
// Still no run
319+
verify(threadPool, never()).executor(MachineLearning.DATAFEED_THREAD_POOL_NAME);
320+
321+
hasOpenAutodetectCommunicator.set(true);
322+
323+
capturedClusterStateListener.getValue().clusterChanged(
324+
new ClusterChangedEvent("_source", cs.build(), anotherJobCs.build()));
325+
326+
// Now it should run as the autodetect communicator is open
282327
verify(threadPool, times(1)).executor(MachineLearning.DATAFEED_THREAD_POOL_NAME);
283328
}
284329

x-pack/qa/rolling-upgrade/src/test/resources/rest-api-spec/test/upgraded_cluster/60_ml_config_migration.yml

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -8,9 +8,6 @@ setup:
88

99
---
1010
"Test old cluster jobs and datafeeds and delete them":
11-
- skip:
12-
version: "all"
13-
reason: "@AwaitsFix: https://github.com/elastic/elasticsearch/issues/36810"
1411

1512
- do:
1613
xpack.ml.get_jobs:

0 commit comments

Comments
 (0)