Skip to content

Commit db0ea98

Browse files
[ML] Force stop deployment in use (#80431) (#80489)
Implements a `force` parameter to the stop deployment API. This allows a user to forcefully stop a deployment. Currently, this specifically allows stopping a deployment that is in use by ingest processors. Co-authored-by: Elastic Machine <[email protected]>
1 parent 6faead8 commit db0ea98

File tree

7 files changed

+156
-7
lines changed

7 files changed

+156
-7
lines changed

docs/reference/ml/df-analytics/apis/stop-trained-model-deployment.asciidoc

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -30,10 +30,18 @@ experimental::[]
3030
(Required, string)
3131
include::{es-repo-dir}/ml/ml-shared.asciidoc[tag=model-id]
3232

33-
////
33+
3434
[[stop-trained-model-deployment-query-params]]
3535
== {api-query-parms-title}
36-
////
36+
37+
`allow_no_match`::
38+
(Optional, Boolean)
39+
include::{es-repo-dir}/ml/ml-shared.asciidoc[tag=allow-no-match]
40+
41+
42+
`force`::
43+
(Optional, Boolean) If true, the deployment is stopped even if it is referenced by
44+
ingest pipelines.
3745

3846
////
3947
[role="child_attributes"]

rest-api-spec/src/main/resources/rest-api-spec/api/ml.stop_trained_model_deployment.json

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,21 @@
2626
}
2727
}
2828
]
29+
},
30+
"params":{
31+
"allow_no_match":{
32+
"type":"boolean",
33+
"required":false,
34+
"description":"Whether to ignore if a wildcard expression matches no deployments. (This includes `_all` string or when no deployments have been specified)"
35+
},
36+
"force":{
37+
"type":"boolean",
38+
"required":false,
39+
"description":"True if the deployment should be forcefully stopped"
40+
}
41+
},
42+
"body":{
43+
"description":"The stop deployment parameters"
2944
}
3045
}
3146
}

x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/action/StopTrainedModelDeploymentAction.java

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,14 +10,18 @@
1010
import org.elasticsearch.action.ActionType;
1111
import org.elasticsearch.action.support.tasks.BaseTasksRequest;
1212
import org.elasticsearch.action.support.tasks.BaseTasksResponse;
13+
import org.elasticsearch.common.Strings;
1314
import org.elasticsearch.common.io.stream.StreamInput;
1415
import org.elasticsearch.common.io.stream.StreamOutput;
1516
import org.elasticsearch.common.io.stream.Writeable;
1617
import org.elasticsearch.tasks.Task;
18+
import org.elasticsearch.xcontent.ObjectParser;
1719
import org.elasticsearch.xcontent.ParseField;
1820
import org.elasticsearch.xcontent.ToXContentObject;
1921
import org.elasticsearch.xcontent.XContentBuilder;
22+
import org.elasticsearch.xcontent.XContentParser;
2023
import org.elasticsearch.xpack.core.ml.inference.TrainedModelConfig;
24+
import org.elasticsearch.xpack.core.ml.job.messages.Messages;
2125
import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper;
2226

2327
import java.io.IOException;
@@ -41,6 +45,26 @@ public static class Request extends BaseTasksRequest<Request> implements ToXCont
4145
private boolean allowNoMatch = true;
4246
private boolean force;
4347

48+
private static final ObjectParser<Request, Void> PARSER = new ObjectParser<>(NAME, Request::new);
49+
50+
static {
51+
PARSER.declareString(Request::setId, TrainedModelConfig.MODEL_ID);
52+
PARSER.declareBoolean(Request::setAllowNoMatch, ALLOW_NO_MATCH);
53+
PARSER.declareBoolean(Request::setForce, FORCE);
54+
}
55+
56+
public static Request parseRequest(String id, XContentParser parser) {
57+
Request request = PARSER.apply(parser, null);
58+
if (request.getId() == null) {
59+
request.setId(id);
60+
} else if (Strings.isNullOrEmpty(id) == false && id.equals(request.getId()) == false) {
61+
throw new IllegalArgumentException(
62+
Messages.getMessage(Messages.INCONSISTENT_ID, TrainedModelConfig.MODEL_ID, request.getId(), id)
63+
);
64+
}
65+
return request;
66+
}
67+
4468
public Request(String id) {
4569
setId(id);
4670
}
@@ -52,6 +76,8 @@ public Request(StreamInput in) throws IOException {
5276
force = in.readBoolean();
5377
}
5478

79+
private Request() {}
80+
5581
public final void setId(String id) {
5682
this.id = ExceptionsHelper.requireNonNull(id, TrainedModelConfig.MODEL_ID);
5783
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License
4+
* 2.0; you may not use this file except in compliance with the Elastic License
5+
* 2.0.
6+
*/
7+
8+
package org.elasticsearch.xpack.core.ml.action;
9+
10+
import org.elasticsearch.common.io.stream.Writeable;
11+
import org.elasticsearch.test.AbstractSerializingTestCase;
12+
import org.elasticsearch.xcontent.XContentParser;
13+
import org.elasticsearch.xpack.core.ml.action.StopTrainedModelDeploymentAction.Request;
14+
15+
import java.io.IOException;
16+
17+
public class StopTrainedModelDeploymentRequestTests extends AbstractSerializingTestCase<Request> {
18+
19+
@Override
20+
protected Request doParseInstance(XContentParser parser) throws IOException {
21+
return Request.parseRequest(null, parser);
22+
}
23+
24+
@Override
25+
protected Writeable.Reader<Request> instanceReader() {
26+
return Request::new;
27+
}
28+
29+
@Override
30+
protected Request createTestInstance() {
31+
Request request = new Request(randomAlphaOfLength(10));
32+
if (randomBoolean()) {
33+
request.setAllowNoMatch(randomBoolean());
34+
}
35+
if (randomBoolean()) {
36+
request.setForce(randomBoolean());
37+
}
38+
return request;
39+
}
40+
}

x-pack/plugin/ml/qa/native-multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/ml/integration/PyTorchModelIT.java

Lines changed: 45 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -484,6 +484,42 @@ public void testInferencePipelineAgainstUnallocatedModel() throws IOException {
484484
);
485485
}
486486

487+
public void testStopUsedDeploymentByIngestProcessor() throws IOException {
488+
String modelId = "test_stop_used_deployment_by_ingest_processor";
489+
createTrainedModel(modelId);
490+
putModelDefinition(modelId);
491+
putVocabulary(List.of("these", "are", "my", "words"), modelId);
492+
startDeployment(modelId);
493+
494+
client().performRequest(
495+
putPipeline(
496+
"my_pipeline",
497+
"{"
498+
+ "\"processors\": [\n"
499+
+ " {\n"
500+
+ " \"inference\": {\n"
501+
+ " \"model_id\": \""
502+
+ modelId
503+
+ "\"\n"
504+
+ " }\n"
505+
+ " }\n"
506+
+ " ]\n"
507+
+ "}"
508+
)
509+
);
510+
ResponseException ex = expectThrows(ResponseException.class, () -> stopDeployment(modelId));
511+
assertThat(ex.getResponse().getStatusLine().getStatusCode(), equalTo(409));
512+
assertThat(
513+
EntityUtils.toString(ex.getResponse().getEntity()),
514+
containsString(
515+
"Cannot stop deployment for model [test_stop_used_deployment_by_ingest_processor] as it is referenced by"
516+
+ " ingest processors; use force to stop the deployment"
517+
)
518+
);
519+
520+
stopDeployment(modelId, true);
521+
}
522+
487523
private int sumInferenceCountOnNodes(List<Map<String, Object>> nodes) {
488524
int inferenceCount = 0;
489525
for (var node : nodes) {
@@ -554,7 +590,15 @@ private Response startDeployment(String modelId, String waitForState) throws IOE
554590
}
555591

556592
private void stopDeployment(String modelId) throws IOException {
557-
Request request = new Request("POST", "/_ml/trained_models/" + modelId + "/deployment/_stop");
593+
stopDeployment(modelId, false);
594+
}
595+
596+
private void stopDeployment(String modelId, boolean force) throws IOException {
597+
String endpoint = "/_ml/trained_models/" + modelId + "/deployment/_stop";
598+
if (force) {
599+
endpoint += "?force=true";
600+
}
601+
Request request = new Request("POST", endpoint);
558602
client().performRequest(request);
559603
}
560604

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportStopTrainedModelDeploymentAction.java

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -109,7 +109,9 @@ protected void doExecute(
109109
return;
110110
}
111111

112-
logger.debug("[{}] Received request to undeploy", request.getId());
112+
logger.debug(
113+
() -> new ParameterizedMessage("[{}] Received request to undeploy{}", request.getId(), request.isForce() ? " (force)" : "")
114+
);
113115

114116
ActionListener<GetTrainedModelsAction.Response> getModelListener = ActionListener.wrap(getModelsResponse -> {
115117
List<TrainedModelConfig> models = getModelsResponse.getResources().results();
@@ -136,10 +138,10 @@ protected void doExecute(
136138
IngestMetadata currentIngestMetadata = state.metadata().custom(IngestMetadata.TYPE);
137139
Set<String> referencedModels = getReferencedModelKeys(currentIngestMetadata, ingestService);
138140

139-
if (referencedModels.contains(modelId)) {
141+
if (request.isForce() == false && referencedModels.contains(modelId)) {
140142
listener.onFailure(
141143
new ElasticsearchStatusException(
142-
"Cannot stop allocation for model [{}] as it is still referenced by ingest processors",
144+
"Cannot stop deployment for model [{}] as it is referenced by ingest processors; use force to stop the deployment",
143145
RestStatus.CONFLICT,
144146
modelId
145147
)

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/rest/inference/RestStopTrainedModelDeploymentAction.java

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,21 @@ public List<Route> routes() {
3838
@Override
3939
protected RestChannelConsumer prepareRequest(RestRequest restRequest, NodeClient client) throws IOException {
4040
String modelId = restRequest.param(TrainedModelConfig.MODEL_ID.getPreferredName());
41-
StopTrainedModelDeploymentAction.Request request = new StopTrainedModelDeploymentAction.Request(modelId);
41+
StopTrainedModelDeploymentAction.Request request;
42+
if (restRequest.hasContentOrSourceParam()) {
43+
request = StopTrainedModelDeploymentAction.Request.parseRequest(modelId, restRequest.contentOrSourceParamParser());
44+
} else {
45+
request = new StopTrainedModelDeploymentAction.Request(modelId);
46+
request.setAllowNoMatch(
47+
restRequest.paramAsBoolean(
48+
StopTrainedModelDeploymentAction.Request.ALLOW_NO_MATCH.getPreferredName(),
49+
request.isAllowNoMatch()
50+
)
51+
);
52+
request.setForce(
53+
restRequest.paramAsBoolean(StopTrainedModelDeploymentAction.Request.FORCE.getPreferredName(), request.isForce())
54+
);
55+
}
4256
return channel -> client.execute(StopTrainedModelDeploymentAction.INSTANCE, request, new RestToXContentListener<>(channel));
4357
}
4458
}

0 commit comments

Comments
 (0)