Skip to content

Commit 3bf8e24

Browse files
authored
ML: update set_upgrade_mode, add logging (elastic#38372)
* ML: update set_upgrade_mode, add logging * Attempt to fix datafeed isolation Also renamed a few methods/variables for clarity and added some comments
1 parent 645db34 commit 3bf8e24

File tree

5 files changed

+67
-19
lines changed

5 files changed

+67
-19
lines changed

x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/messages/Messages.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -79,6 +79,7 @@ public final class Messages {
7979
public static final String JOB_AUDIT_DATAFEED_STARTED_FROM_TO = "Datafeed started (from: {0} to: {1}) with frequency [{2}]";
8080
public static final String JOB_AUDIT_DATAFEED_STARTED_REALTIME = "Datafeed started in real-time";
8181
public static final String JOB_AUDIT_DATAFEED_STOPPED = "Datafeed stopped";
82+
public static final String JOB_AUDIT_DATAFEED_ISOLATED = "Datafeed isolated";
8283
public static final String JOB_AUDIT_DELETING = "Deleting job by task with id ''{0}''";
8384
public static final String JOB_AUDIT_DELETING_FAILED = "Error deleting job: {0}";
8485
public static final String JOB_AUDIT_DELETED = "Job deleted";

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MlLifeCycleService.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,7 @@ public synchronized void stop() {
4545
// datafeeds, so they get reallocated. We have to do this first, otherwise the datafeeds
4646
// could fail if they send data to a dead autodetect process.
4747
if (datafeedManager != null) {
48-
datafeedManager.isolateAllDatafeedsOnThisNode();
48+
datafeedManager.isolateAllDatafeedsOnThisNodeBeforeShutdown();
4949
}
5050
NativeController nativeController = NativeControllerHolder.getNativeController(environment);
5151
if (nativeController != null) {

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportSetUpgradeModeAction.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -263,6 +263,9 @@ private void unassignPersistentTasks(PersistentTasksCustomMetaData tasksCustomMe
263263
.sorted(Comparator.comparing(PersistentTask::getTaskName))
264264
.collect(Collectors.toList());
265265

266+
logger.info("Un-assigning persistent tasks : " +
267+
datafeedAndJobTasks.stream().map(PersistentTask::getId).collect(Collectors.joining(", ", "[ ", " ]")));
268+
266269
TypedChainTaskExecutor<PersistentTask<?>> chainTaskExecutor =
267270
new TypedChainTaskExecutor<>(client.threadPool().executor(executor()),
268271
r -> true,
@@ -287,6 +290,7 @@ private void isolateDatafeeds(PersistentTasksCustomMetaData tasksCustomMetaData,
287290
ActionListener<List<IsolateDatafeedAction.Response>> listener) {
288291
Set<String> datafeedsToIsolate = MlTasks.startedDatafeedIds(tasksCustomMetaData);
289292

293+
logger.info("Isolating datafeeds: " + datafeedsToIsolate.toString());
290294
TypedChainTaskExecutor<IsolateDatafeedAction.Response> isolateDatafeedsExecutor =
291295
new TypedChainTaskExecutor<>(client.threadPool().executor(executor()), r -> true, ex -> true);
292296

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedManager.java

Lines changed: 18 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,6 @@ public class DatafeedManager {
6464
private final DatafeedJobBuilder datafeedJobBuilder;
6565
private final TaskRunner taskRunner = new TaskRunner();
6666
private final AutodetectProcessManager autodetectProcessManager;
67-
private volatile boolean isolated;
6867

6968
public DatafeedManager(ThreadPool threadPool, Client client, ClusterService clusterService, DatafeedJobBuilder datafeedJobBuilder,
7069
Supplier<Long> currentTimeSupplier, Auditor auditor, AutodetectProcessManager autodetectProcessManager) {
@@ -130,18 +129,20 @@ public void stopAllDatafeedsOnThisNode(String reason) {
130129
* This is used before the JVM is killed. It differs from stopAllDatafeedsOnThisNode in that it leaves
131130
* the datafeed tasks in the "started" state, so that they get restarted on a different node.
132131
*/
133-
public void isolateAllDatafeedsOnThisNode() {
134-
isolated = true;
132+
public void isolateAllDatafeedsOnThisNodeBeforeShutdown() {
135133
Iterator<Holder> iter = runningDatafeedsOnThisNode.values().iterator();
136134
while (iter.hasNext()) {
137135
Holder next = iter.next();
138136
next.isolateDatafeed();
139-
next.setRelocating();
137+
// TODO: it's not ideal that this "isolate" method does something a bit different to the one below
138+
next.setNodeIsShuttingDown();
140139
iter.remove();
141140
}
142141
}
143142

144143
public void isolateDatafeed(long allocationId) {
144+
// This calls get() rather than remove() because we expect that the persistent task will
145+
// be removed shortly afterwards and that operation needs to be able to find the holder
145146
Holder holder = runningDatafeedsOnThisNode.get(allocationId);
146147
if (holder != null) {
147148
holder.isolateDatafeed();
@@ -195,7 +196,7 @@ protected void doRun() {
195196
holder.stop("general_lookback_failure", TimeValue.timeValueSeconds(20), e);
196197
return;
197198
}
198-
if (isolated == false) {
199+
if (holder.isIsolated() == false) {
199200
if (next != null) {
200201
doDatafeedRealtime(next, holder.datafeedJob.getJobId(), holder);
201202
} else {
@@ -298,7 +299,7 @@ public class Holder {
298299
private final ProblemTracker problemTracker;
299300
private final Consumer<Exception> finishHandler;
300301
volatile Scheduler.Cancellable cancellable;
301-
private volatile boolean isRelocating;
302+
private volatile boolean isNodeShuttingDown;
302303

303304
Holder(TransportStartDatafeedAction.DatafeedTask task, String datafeedId, DatafeedJob datafeedJob,
304305
ProblemTracker problemTracker, Consumer<Exception> finishHandler) {
@@ -324,7 +325,7 @@ boolean isIsolated() {
324325
}
325326

326327
public void stop(String source, TimeValue timeout, Exception e) {
327-
if (isRelocating) {
328+
if (isNodeShuttingDown) {
328329
return;
329330
}
330331

@@ -344,11 +345,12 @@ public void stop(String source, TimeValue timeout, Exception e) {
344345
if (cancellable != null) {
345346
cancellable.cancel();
346347
}
347-
auditor.info(datafeedJob.getJobId(), Messages.getMessage(Messages.JOB_AUDIT_DATAFEED_STOPPED));
348+
auditor.info(datafeedJob.getJobId(),
349+
Messages.getMessage(isIsolated() ? Messages.JOB_AUDIT_DATAFEED_ISOLATED : Messages.JOB_AUDIT_DATAFEED_STOPPED));
348350
finishHandler.accept(e);
349351
logger.info("[{}] datafeed [{}] for job [{}] has been stopped{}", source, datafeedId, datafeedJob.getJobId(),
350352
acquired ? "" : ", but there may be pending tasks as the timeout [" + timeout.getStringRep() + "] expired");
351-
if (autoCloseJob) {
353+
if (autoCloseJob && isIsolated() == false) {
352354
closeJob();
353355
}
354356
if (acquired) {
@@ -361,16 +363,18 @@ public void stop(String source, TimeValue timeout, Exception e) {
361363
}
362364

363365
/**
364-
* This stops a datafeed WITHOUT updating the corresponding persistent task. It must ONLY be called
365-
* immediately prior to shutting down a node. Then the datafeed task can remain "started", and be
366-
* relocated to a different node. Calling this method at any other time will ruin the datafeed.
366+
* This stops a datafeed WITHOUT updating the corresponding persistent task. When called it
367+
* will stop the datafeed from sending data to its job as quickly as possible. The caller
368+
* must do something sensible with the corresponding persistent task. If the node is shutting
369+
* down the task will automatically get reassigned. Otherwise the caller must take action to
370+
* remove or reassign the persistent task, or the datafeed will be left in limbo.
367371
*/
368372
public void isolateDatafeed() {
369373
datafeedJob.isolate();
370374
}
371375

372-
public void setRelocating() {
373-
isRelocating = true;
376+
public void setNodeIsShuttingDown() {
377+
isNodeShuttingDown = true;
374378
}
375379

376380
private Long executeLookBack(long startTime, Long endTime) throws Exception {

x-pack/plugin/src/test/resources/rest-api-spec/test/ml/set_upgrade_mode.yml

Lines changed: 43 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,10 @@ setup:
66
indices.create:
77
index: airline-data
88
body:
9+
settings:
10+
index:
11+
number_of_replicas: 0
12+
number_of_shards: 1
913
mappings:
1014
properties:
1115
time:
@@ -53,10 +57,9 @@ setup:
5357
job_id: set-upgrade-mode-job
5458

5559
- do:
56-
headers:
57-
Authorization: "Basic eF9wYWNrX3Jlc3RfdXNlcjp4LXBhY2stdGVzdC1wYXNzd29yZA==" # run as x_pack_rest_user, i.e. the test setup superuser
58-
ml.start_datafeed:
59-
datafeed_id: set-upgrade-mode-job-datafeed
60+
cluster.health:
61+
index: airline-data
62+
wait_for_status: green
6063

6164
---
6265
teardown:
@@ -70,6 +73,10 @@ teardown:
7073

7174
---
7275
"Test setting upgrade_mode to false when it is already false":
76+
- do:
77+
ml.start_datafeed:
78+
datafeed_id: set-upgrade-mode-job-datafeed
79+
7380
- do:
7481
ml.set_upgrade_mode:
7582
enabled: false
@@ -92,6 +99,22 @@ teardown:
9299

93100
---
94101
"Setting upgrade_mode to enabled":
102+
- do:
103+
ml.start_datafeed:
104+
datafeed_id: set-upgrade-mode-job-datafeed
105+
106+
- do:
107+
cat.tasks: {}
108+
- match:
109+
$body: |
110+
/.+job.+/
111+
112+
- do:
113+
cat.tasks: {}
114+
- match:
115+
$body: |
116+
/.+datafeed.+/
117+
95118
- do:
96119
ml.info: {}
97120
- match: { upgrade_mode: false }
@@ -125,6 +148,22 @@ teardown:
125148

126149
---
127150
"Setting upgrade mode to disabled from enabled":
151+
- do:
152+
ml.start_datafeed:
153+
datafeed_id: set-upgrade-mode-job-datafeed
154+
155+
- do:
156+
cat.tasks: {}
157+
- match:
158+
$body: |
159+
/.+job.+/
160+
161+
- do:
162+
cat.tasks: {}
163+
- match:
164+
$body: |
165+
/.+datafeed.+/
166+
128167
- do:
129168
ml.set_upgrade_mode:
130169
enabled: true

0 commit comments

Comments
 (0)