Skip to content

Commit fbabd81

Browse files
authored
[ML] Allow stop unassigned datafeed and relax unset upgrade mode wait (#39034)
These two changes are interlinked. Before this change unsetting ML upgrade mode would wait for all datafeeds to be assigned and not waiting for their corresponding jobs to initialise. However, this could be inappropriate, if there was a reason other that upgrade mode why one job was unable to be assigned or slow to start up. Unsetting of upgrade mode would hang in this case. This change relaxes the condition for considering upgrade mode to be unset to simply that an assignment attempt has been made for each ML persistent task that did not fail because upgrade mode was enabled. Thus after unsetting upgrade mode there is no guarantee that every ML persistent task is assigned, just that each is not unassigned due to upgrade mode. In order to make setting upgrade mode work immediately after unsetting upgrade mode it was then also necessary to make it possible to stop a datafeed that was not assigned. There was no particularly good reason why this was not allowed in the past. It is trivial to stop an unassigned datafeed because it just involves removing the persistent task.
1 parent 01d6263 commit fbabd81

File tree

4 files changed

+34
-40
lines changed

4 files changed

+34
-40
lines changed

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportSetUpgradeModeAction.java

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -197,13 +197,9 @@ protected void masterOperation(SetUpgradeModeAction.Request request, ClusterStat
197197
(t) -> t.getAssignment().equals(AWAITING_UPGRADE))
198198
.isEmpty() &&
199199

200-
// Datafeeds to wait for a non-"Awaiting upgrade" assignment and for the job task allocations to converge
201-
// If we do not wait, deleting datafeeds, or attempting to unallocate them again causes issues as the
202-
// job's task allocationId could have changed during either process.
200+
// Wait for datafeeds to not be "Awaiting upgrade"
203201
persistentTasksCustomMetaData.findTasks(DATAFEED_TASK_NAME,
204-
(t) ->
205-
t.getAssignment().equals(AWAITING_UPGRADE) ||
206-
t.getAssignment().getExplanation().contains("state is stale"))
202+
(t) -> t.getAssignment().equals(AWAITING_UPGRADE))
207203
.isEmpty(),
208204
request.timeout(),
209205
ActionListener.wrap(r -> wrappedListener.onResponse(new AcknowledgedResponse(true)), wrappedListener::onFailure)

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportStopDatafeedAction.java

Lines changed: 24 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,6 @@
2929
import org.elasticsearch.xpack.core.ml.MlTasks;
3030
import org.elasticsearch.xpack.core.ml.action.StopDatafeedAction;
3131
import org.elasticsearch.xpack.core.ml.datafeed.DatafeedState;
32-
import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper;
3332
import org.elasticsearch.xpack.ml.MachineLearning;
3433
import org.elasticsearch.xpack.ml.datafeed.persistence.DatafeedConfigProvider;
3534

@@ -104,7 +103,7 @@ protected void doExecute(Task task, StopDatafeedAction.Request request, ActionLi
104103
final DiscoveryNodes nodes = state.nodes();
105104
if (nodes.isLocalNodeElectedMaster() == false) {
106105
// Delegates stop datafeed to elected master node, so it becomes the coordinating node.
107-
// See comment in StartDatafeedAction.Transport class for more information.
106+
// See comment in TransportStartDatafeedAction for more information.
108107
if (nodes.getMasterNode() == null) {
109108
listener.onFailure(new MasterNotDiscoveredException("no known master node"));
110109
} else {
@@ -142,13 +141,21 @@ private void normalStopDatafeed(Task task, StopDatafeedAction.Request request, A
142141
Set<String> executorNodes = new HashSet<>();
143142
for (String datafeedId : startedDatafeeds) {
144143
PersistentTasksCustomMetaData.PersistentTask<?> datafeedTask = MlTasks.getDatafeedTask(datafeedId, tasks);
145-
if (datafeedTask == null || datafeedTask.isAssigned() == false) {
146-
String message = "Cannot stop datafeed [" + datafeedId + "] because the datafeed does not have an assigned node." +
147-
" Use force stop to stop the datafeed";
148-
listener.onFailure(ExceptionsHelper.conflictStatusException(message));
149-
return;
150-
} else {
144+
if (datafeedTask == null) {
145+
// This should not happen, because startedDatafeeds was derived from the same tasks that is passed to this method
146+
String msg = "Requested datafeed [" + datafeedId + "] be stopped, but datafeed's task could not be found.";
147+
assert datafeedTask != null : msg;
148+
logger.error(msg);
149+
} else if (datafeedTask.isAssigned()) {
151150
executorNodes.add(datafeedTask.getExecutorNode());
151+
} else {
152+
// This is the easy case - the datafeed is not currently assigned to a node,
153+
// so can be gracefully stopped simply by removing its persistent task. (Usually
154+
// a graceful stop cannot be achieved by simply removing the persistent task, but
155+
// if the datafeed has no running code then graceful/forceful are the same.)
156+
// The listener here can be a no-op, as waitForDatafeedStopped() already waits for
157+
// these persistent tasks to disappear.
158+
persistentTasksService.sendRemoveRequest(datafeedTask.getId(), ActionListener.wrap(r -> {}, e -> {}));
152159
}
153160
}
154161

@@ -198,9 +205,10 @@ public void onFailure(Exception e) {
198205
}
199206
});
200207
} else {
201-
String msg = "Requested datafeed [" + request.getDatafeedId() + "] be force-stopped, but " +
202-
"datafeed's task could not be found.";
203-
logger.warn(msg);
208+
// This should not happen, because startedDatafeeds was derived from the same tasks that is passed to this method
209+
String msg = "Requested datafeed [" + datafeedId + "] be force-stopped, but datafeed's task could not be found.";
210+
assert datafeedTask != null : msg;
211+
logger.error(msg);
204212
final int slot = counter.incrementAndGet();
205213
failures.set(slot - 1, new RuntimeException(msg));
206214
if (slot == startedDatafeeds.size()) {
@@ -248,19 +256,18 @@ protected void doRun() throws Exception {
248256

249257
private void sendResponseOrFailure(String datafeedId, ActionListener<StopDatafeedAction.Response> listener,
250258
AtomicArray<Exception> failures) {
251-
List<Exception> catchedExceptions = failures.asList();
252-
if (catchedExceptions.size() == 0) {
259+
List<Exception> caughtExceptions = failures.asList();
260+
if (caughtExceptions.size() == 0) {
253261
listener.onResponse(new StopDatafeedAction.Response(true));
254262
return;
255263
}
256264

257-
String msg = "Failed to stop datafeed [" + datafeedId + "] with [" + catchedExceptions.size()
265+
String msg = "Failed to stop datafeed [" + datafeedId + "] with [" + caughtExceptions.size()
258266
+ "] failures, rethrowing last, all Exceptions: ["
259-
+ catchedExceptions.stream().map(Exception::getMessage).collect(Collectors.joining(", "))
267+
+ caughtExceptions.stream().map(Exception::getMessage).collect(Collectors.joining(", "))
260268
+ "]";
261269

262-
ElasticsearchException e = new ElasticsearchException(msg,
263-
catchedExceptions.get(0));
270+
ElasticsearchException e = new ElasticsearchException(msg, caughtExceptions.get(0));
264271
listener.onFailure(e);
265272
}
266273

x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/integration/MlDistributedFailureIT.java

Lines changed: 3 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -159,22 +159,15 @@ public void testCloseUnassignedJobAndDatafeed() throws Exception {
159159
client().execute(GetDatafeedsStatsAction.INSTANCE, datafeedStatsRequest).actionGet();
160160
assertEquals(DatafeedState.STARTED, datafeedStatsResponse.getResponse().results().get(0).getDatafeedState());
161161

162-
// Can't normal stop an unassigned datafeed
162+
// An unassigned datafeed can be stopped either normally or by force
163163
StopDatafeedAction.Request stopDatafeedRequest = new StopDatafeedAction.Request(datafeedId);
164-
ElasticsearchStatusException statusException = expectThrows(ElasticsearchStatusException.class,
165-
() -> client().execute(StopDatafeedAction.INSTANCE, stopDatafeedRequest).actionGet());
166-
assertEquals("Cannot stop datafeed [" + datafeedId +
167-
"] because the datafeed does not have an assigned node. Use force stop to stop the datafeed",
168-
statusException.getMessage());
169-
170-
// Can only force stop an unassigned datafeed
171-
stopDatafeedRequest.setForce(true);
164+
stopDatafeedRequest.setForce(randomBoolean());
172165
StopDatafeedAction.Response stopDatafeedResponse = client().execute(StopDatafeedAction.INSTANCE, stopDatafeedRequest).actionGet();
173166
assertTrue(stopDatafeedResponse.isStopped());
174167

175168
// Can't normal stop an unassigned job
176169
CloseJobAction.Request closeJobRequest = new CloseJobAction.Request(jobId);
177-
statusException = expectThrows(ElasticsearchStatusException.class,
170+
ElasticsearchStatusException statusException = expectThrows(ElasticsearchStatusException.class,
178171
() -> client().execute(CloseJobAction.INSTANCE, closeJobRequest).actionGet());
179172
assertEquals("Cannot close job [" + jobId +
180173
"] because the job does not have an assigned node. Use force close to close the job",

x-pack/plugin/src/test/resources/rest-api-spec/test/ml/set_upgrade_mode.yml

Lines changed: 5 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -206,20 +206,18 @@ teardown:
206206
ml.get_datafeed_stats:
207207
datafeed_id: set-upgrade-mode-job-datafeed
208208
- match: { datafeeds.0.state: "started" }
209-
- match: { datafeeds.0.assignment_explanation: "" }
209+
# The datafeed will not be assigned until the job has updated its status on the node it's assigned
210+
# to, and that probably won't happen in time for this assertion. That is indicated by an assignment
211+
# reason ending "state is stale". However, the datafeed should NOT be unassigned with a reason of
212+
# "upgrade mode is enabled" - that reason should have gone away before this test.
213+
- match: { datafeeds.0.assignment_explanation: /(^$|.+job.+state.is.stale)/ }
210214

211215
- do:
212216
cat.tasks: {}
213217
- match:
214218
$body: |
215219
/.+job.+/
216220
217-
- do:
218-
cat.tasks: {}
219-
- match:
220-
$body: |
221-
/.+datafeed.+/
222-
223221
---
224222
"Attempt to open job when upgrade_mode is enabled":
225223
- do:

0 commit comments

Comments
 (0)