Skip to content

Commit 0d7e4c2

Browse files
authored
[ML] fix ml API cancellation when http connection is closed (#88616)
API cancellation was erroneously using the node-name for the parent task id. Task ids are constructed via the node ID, not name. This fixes that bug. related to: - #88030 - #88142 - #88009
1 parent 430b419 commit 0d7e4c2

14 files changed

+15
-15
lines changed

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportEvaluateDataFrameAction.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -74,7 +74,7 @@ protected void doExecute(
7474
EvaluateDataFrameAction.Request request,
7575
ActionListener<EvaluateDataFrameAction.Response> listener
7676
) {
77-
TaskId parentTaskId = new TaskId(clusterService.getNodeName(), task.getId());
77+
TaskId parentTaskId = new TaskId(clusterService.localNode().getId(), task.getId());
7878
ActionListener<List<Void>> resultsListener = ActionListener.wrap(unused -> {
7979
EvaluateDataFrameAction.Response response = new EvaluateDataFrameAction.Response(
8080
request.getEvaluation().getName(),

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportExplainDataFrameAnalyticsAction.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -125,7 +125,7 @@ private void explain(
125125
ExplainDataFrameAnalyticsAction.Request request,
126126
ActionListener<ExplainDataFrameAnalyticsAction.Response> listener
127127
) {
128-
TaskId parentTaskId = new TaskId(clusterService.getNodeName(), task.getId());
128+
TaskId parentTaskId = new TaskId(clusterService.localNode().getId(), task.getId());
129129
final ExtractedFieldsDetectorFactory extractedFieldsDetectorFactory = new ExtractedFieldsDetectorFactory(
130130
new ParentTaskAssigningClient(client, parentTaskId)
131131
);

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportGetCategoriesAction.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,7 @@ public TransportGetCategoriesAction(
4646

4747
@Override
4848
protected void doExecute(Task task, GetCategoriesAction.Request request, ActionListener<GetCategoriesAction.Response> listener) {
49-
TaskId parentTaskId = new TaskId(clusterService.getNodeName(), task.getId());
49+
TaskId parentTaskId = new TaskId(clusterService.localNode().getId(), task.getId());
5050
jobManager.jobExists(request.getJobId(), parentTaskId, ActionListener.wrap(jobExists -> {
5151
Integer from = request.getPageParams() != null ? request.getPageParams().getFrom() : null;
5252
Integer size = request.getPageParams() != null ? request.getPageParams().getSize() : null;

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportGetDataFrameAnalyticsStatsAction.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -158,7 +158,7 @@ protected void doExecute(
158158
GetDataFrameAnalyticsStatsAction.Request request,
159159
ActionListener<GetDataFrameAnalyticsStatsAction.Response> listener
160160
) {
161-
TaskId parentTaskId = new TaskId(clusterService.getNodeName(), task.getId());
161+
TaskId parentTaskId = new TaskId(clusterService.localNode().getId(), task.getId());
162162
logger.debug("Get stats for data frame analytics [{}]", request.getId());
163163

164164
ActionListener<GetDataFrameAnalyticsAction.Response> getResponseListener = ActionListener.wrap(getResponse -> {

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportGetDatafeedsAction.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,7 @@ protected void masterOperation(
6161
ClusterState state,
6262
ActionListener<GetDatafeedsAction.Response> listener
6363
) {
64-
TaskId parentTaskId = new TaskId(clusterService.getNodeName(), task.getId());
64+
TaskId parentTaskId = new TaskId(clusterService.localNode().getId(), task.getId());
6565
logger.debug("Get datafeed '{}'", request.getDatafeedId());
6666

6767
datafeedManager.getDatafeeds(

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportGetDatafeedsStatsAction.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -68,7 +68,7 @@ protected void doExecute(Task task, Request request, ActionListener<Response> li
6868
ClusterState state = clusterService.state();
6969
final PersistentTasksCustomMetadata tasksInProgress = state.getMetadata().custom(PersistentTasksCustomMetadata.TYPE);
7070
final Response.Builder responseBuilder = new Response.Builder();
71-
final TaskId parentTaskId = new TaskId(clusterService.getNodeName(), task.getId());
71+
final TaskId parentTaskId = new TaskId(clusterService.localNode().getId(), task.getId());
7272

7373
// 5. Build response
7474
ActionListener<GetDatafeedRunningStateAction.Response> runtimeStateListener = ActionListener.wrap(runtimeStateResponse -> {

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportGetJobModelSnapshotsUpgradeStatsAction.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -76,7 +76,7 @@ protected void masterOperation(Task task, Request request, ClusterState state, A
7676
logger.debug(() -> format("[%s] get stats for model snapshot [%s] upgrades", request.getJobId(), request.getSnapshotId()));
7777
final PersistentTasksCustomMetadata tasksInProgress = state.getMetadata().custom(PersistentTasksCustomMetadata.TYPE);
7878
final Collection<PersistentTasksCustomMetadata.PersistentTask<?>> snapshotUpgrades = MlTasks.snapshotUpgradeTasks(tasksInProgress);
79-
final TaskId parentTaskId = new TaskId(clusterService.getNodeName(), task.getId());
79+
final TaskId parentTaskId = new TaskId(clusterService.localNode().getId(), task.getId());
8080

8181
// 2. Now that we have the job IDs, find the relevant model snapshot upgrades
8282
ActionListener<List<Job.Builder>> expandIdsListener = ActionListener.wrap(jobs -> {

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportGetJobsAction.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -69,7 +69,7 @@ protected void masterOperation(
6969
ClusterState state,
7070
ActionListener<GetJobsAction.Response> listener
7171
) {
72-
TaskId parentTaskId = new TaskId(clusterService.getNodeName(), task.getId());
72+
TaskId parentTaskId = new TaskId(clusterService.localNode().getId(), task.getId());
7373
logger.debug("Get job '{}'", request.getJobId());
7474
jobManager.expandJobBuilders(
7575
request.getJobId(),

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportGetJobsStatsAction.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -95,7 +95,7 @@ public TransportGetJobsStatsAction(
9595
@Override
9696
protected void doExecute(Task task, GetJobsStatsAction.Request request, ActionListener<GetJobsStatsAction.Response> finalListener) {
9797
logger.debug("Get stats for job [{}]", request.getJobId());
98-
TaskId parentTaskId = new TaskId(clusterService.getNodeName(), task.getId());
98+
TaskId parentTaskId = new TaskId(clusterService.localNode().getId(), task.getId());
9999

100100
ClusterState state = clusterService.state();
101101
PersistentTasksCustomMetadata tasks = state.getMetadata().custom(PersistentTasksCustomMetadata.TYPE);
@@ -144,7 +144,7 @@ protected void taskOperation(
144144
JobTask task,
145145
ActionListener<QueryPage<JobStats>> listener
146146
) {
147-
TaskId parentTaskId = new TaskId(clusterService.getNodeName(), actionTask.getId());
147+
TaskId parentTaskId = new TaskId(clusterService.localNode().getId(), actionTask.getId());
148148
String jobId = task.getJobId();
149149
ClusterState state = clusterService.state();
150150
PersistentTasksCustomMetadata tasks = state.getMetadata().custom(PersistentTasksCustomMetadata.TYPE);

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportGetModelSnapshotsAction.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,7 @@ protected void doExecute(
5353
GetModelSnapshotsAction.Request request,
5454
ActionListener<GetModelSnapshotsAction.Response> listener
5555
) {
56-
TaskId parentTaskId = new TaskId(clusterService.getNodeName(), task.getId());
56+
TaskId parentTaskId = new TaskId(clusterService.localNode().getId(), task.getId());
5757
logger.debug(
5858
() -> format(
5959
"Get model snapshots for job %s snapshot ID %s. from = %s, size = %s start = '%s', end='%s', sort=%s descending=%s",

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportInferTrainedModelDeploymentAction.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -72,7 +72,7 @@ protected void doExecute(
7272
InferTrainedModelDeploymentAction.Request request,
7373
ActionListener<InferTrainedModelDeploymentAction.Response> listener
7474
) {
75-
TaskId taskId = new TaskId(clusterService.getNodeName(), task.getId());
75+
TaskId taskId = new TaskId(clusterService.localNode().getId(), task.getId());
7676
final String deploymentId = request.getDeploymentId();
7777
// We need to check whether there is at least an assigned task here, otherwise we cannot redirect to the
7878
// node running the job task.

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportInternalInferModelAction.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -96,7 +96,7 @@ public TransportInternalInferModelAction(
9696
protected void doExecute(Task task, Request request, ActionListener<Response> listener) {
9797

9898
Response.Builder responseBuilder = Response.builder();
99-
TaskId parentTaskId = new TaskId(clusterService.getNodeName(), task.getId());
99+
TaskId parentTaskId = new TaskId(clusterService.localNode().getId(), task.getId());
100100

101101
if (MachineLearningField.ML_API_FEATURE.check(licenseState)) {
102102
responseBuilder.setLicensed(true);

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportPreviewDataFrameAnalyticsAction.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -102,7 +102,7 @@ protected void doExecute(Task task, Request request, ActionListener<Response> li
102102
}
103103

104104
void preview(Task task, DataFrameAnalyticsConfig config, ActionListener<Response> listener) {
105-
final TaskId parentTaskId = new TaskId(clusterService.getNodeName(), task.getId());
105+
final TaskId parentTaskId = new TaskId(clusterService.localNode().getId(), task.getId());
106106
final ExtractedFieldsDetectorFactory extractedFieldsDetectorFactory = new ExtractedFieldsDetectorFactory(
107107
new ParentTaskAssigningClient(client, parentTaskId)
108108
);

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportPreviewDatafeedAction.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -92,7 +92,7 @@ public TransportPreviewDatafeedAction(
9292

9393
@Override
9494
protected void doExecute(Task task, PreviewDatafeedAction.Request request, ActionListener<PreviewDatafeedAction.Response> listener) {
95-
TaskId parentTaskId = new TaskId(clusterService.getNodeName(), task.getId());
95+
TaskId parentTaskId = new TaskId(clusterService.localNode().getId(), task.getId());
9696
ActionListener<DatafeedConfig> datafeedConfigActionListener = ActionListener.wrap(datafeedConfig -> {
9797
if (request.getJobConfig() != null) {
9898
previewDatafeed(parentTaskId, datafeedConfig, request.getJobConfig().build(new Date()), request, listener);

0 commit comments

Comments
 (0)