Skip to content

Commit 8db1c9c

Browse files
author
Hendrik Muhs
authored
[7.x][Transform] stop transform regardless of transform nodes (#69419) (#69526)
allow stop transform to stop a transform task if its waiting for assignment(e.g. if the cluster lacks a transform node) fixes #69260
1 parent 78e0b71 commit 8db1c9c

File tree

5 files changed

+171
-19
lines changed

5 files changed

+171
-19
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,68 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License
4+
* 2.0; you may not use this file except in compliance with the Elastic License
5+
* 2.0.
6+
*/
7+
8+
package org.elasticsearch.xpack.transform.action;
9+
10+
import java.util.Collections;
11+
import java.util.Set;
12+
13+
/**
14+
* Record of transform tasks and their current persistent task state.
15+
*
16+
* This class is aimed to be used by start/stop and stats action.
17+
*/
18+
public final class TransformNodeAssignments {
19+
20+
// set of nodes where requested transforms are executed on
21+
private final Set<String> executorNodes;
22+
// set of transforms that are currently assigned to a node
23+
private final Set<String> assigned;
24+
// set of transforms that currently wait for node assignment
25+
private final Set<String> waitingForAssignment;
26+
// set of transforms that have neither a task nor wait for assignment, so considered stopped
27+
private final Set<String> stopped;
28+
29+
TransformNodeAssignments(
30+
final Set<String> executorNodes,
31+
final Set<String> assigned,
32+
final Set<String> waitingForAssignment,
33+
final Set<String> stopped
34+
) {
35+
this.executorNodes = Collections.unmodifiableSet(executorNodes);
36+
this.assigned = Collections.unmodifiableSet(assigned);
37+
this.waitingForAssignment = Collections.unmodifiableSet(waitingForAssignment);
38+
this.stopped = Collections.unmodifiableSet(stopped);
39+
}
40+
41+
/*
42+
* Get nodes where (requested) transforms are executed.
43+
*/
44+
public Set<String> getExecutorNodes() {
45+
return executorNodes;
46+
}
47+
48+
/*
49+
* Get transforms which have tasks currently assigned to a node
50+
*/
51+
public Set<String> getAssigned() {
52+
return assigned;
53+
}
54+
55+
/*
56+
* Get transforms which are currently waiting to be assigned to a node
57+
*/
58+
public Set<String> getWaitingForAssignment() {
59+
return waitingForAssignment;
60+
}
61+
62+
/*
63+
* Get transforms which have no tasks, which means they are stopped
64+
*/
65+
public Set<String> getStopped() {
66+
return stopped;
67+
}
68+
}

x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/action/TransformNodes.java

+16-4
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
import java.util.HashSet;
1616
import java.util.List;
1717
import java.util.Set;
18+
import java.util.stream.Collectors;
1819

1920
public final class TransformNodes {
2021

@@ -27,9 +28,11 @@ private TransformNodes() {}
2728
* @param clusterState State
2829
* @return The executor nodes
2930
*/
30-
public static String[] transformTaskNodes(List<String> transformIds, ClusterState clusterState) {
31+
public static TransformNodeAssignments transformTaskNodes(List<String> transformIds, ClusterState clusterState) {
3132

3233
Set<String> executorNodes = new HashSet<>();
34+
Set<String> assigned = new HashSet<>();
35+
Set<String> waitingForAssignment = new HashSet<>();
3336

3437
PersistentTasksCustomMetadata tasksMetadata = PersistentTasksCustomMetadata.getPersistentTasksCustomMetadata(clusterState);
3538

@@ -38,14 +41,23 @@ public static String[] transformTaskNodes(List<String> transformIds, ClusterStat
3841

3942
Collection<PersistentTasksCustomMetadata.PersistentTask<?>> tasks = tasksMetadata.findTasks(
4043
TransformField.TASK_NAME,
41-
t -> transformIdsSet.contains(t.getId()) && t.isAssigned()
44+
t -> transformIdsSet.contains(t.getId())
4245
);
4346

4447
for (PersistentTasksCustomMetadata.PersistentTask<?> task : tasks) {
45-
executorNodes.add(task.getExecutorNode());
48+
if (task.isAssigned()) {
49+
executorNodes.add(task.getExecutorNode());
50+
assigned.add(task.getId());
51+
} else {
52+
waitingForAssignment.add(task.getId());
53+
}
4654
}
4755
}
4856

49-
return executorNodes.toArray(new String[0]);
57+
Set<String> stopped = transformIds.stream()
58+
.filter(id -> (assigned.contains(id) || waitingForAssignment.contains(id)) == false)
59+
.collect(Collectors.toSet());
60+
61+
return new TransformNodeAssignments(executorNodes, assigned, waitingForAssignment, stopped);
5062
}
5163
}

x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/action/TransportGetTransformStatsAction.java

+3-1
Original file line numberDiff line numberDiff line change
@@ -138,7 +138,9 @@ protected void doExecute(Task task, Request request, ActionListener<Response> fi
138138
ActionListener.wrap(hitsAndIds -> {
139139
request.setExpandedIds(hitsAndIds.v2());
140140
final ClusterState state = clusterService.state();
141-
request.setNodes(TransformNodes.transformTaskNodes(hitsAndIds.v2(), state));
141+
TransformNodeAssignments transformNodeAssignments = TransformNodes.transformTaskNodes(hitsAndIds.v2(), state);
142+
// TODO: if empty the request is send to all nodes(benign but superfluous)
143+
request.setNodes(transformNodeAssignments.getExecutorNodes().toArray(new String[0]));
142144
super.doExecute(task, request, ActionListener.wrap(response -> {
143145
PersistentTasksCustomMetadata tasksInProgress = state.getMetadata().custom(PersistentTasksCustomMetadata.TYPE);
144146
if (tasksInProgress != null) {

x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/action/TransportStopTransformAction.java

+45-2
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
import org.elasticsearch.action.FailedNodeException;
1717
import org.elasticsearch.action.TaskOperationFailure;
1818
import org.elasticsearch.action.support.ActionFilters;
19+
import org.elasticsearch.action.support.GroupedActionListener;
1920
import org.elasticsearch.action.support.tasks.TransportTasksAction;
2021
import org.elasticsearch.client.Client;
2122
import org.elasticsearch.cluster.ClusterState;
@@ -158,6 +159,7 @@ static Tuple<Set<String>, Set<String>> findTasksWithoutConfig(ClusterState state
158159
protected void doExecute(Task task, Request request, ActionListener<Response> listener) {
159160
final ClusterState state = clusterService.state();
160161
final DiscoveryNodes nodes = state.nodes();
162+
161163
if (nodes.isLocalNodeElectedMaster() == false) {
162164
// Delegates stop transform to elected master node so it becomes the coordinating node.
163165
if (nodes.getMasterNode() == null) {
@@ -185,15 +187,29 @@ protected void doExecute(Task task, Request request, ActionListener<Response> li
185187
ActionListener.wrap(hitsAndIds -> {
186188
validateTaskState(state, hitsAndIds.v2(), request.isForce());
187189
request.setExpandedIds(new HashSet<>(hitsAndIds.v2()));
188-
request.setNodes(TransformNodes.transformTaskNodes(hitsAndIds.v2(), state));
189-
super.doExecute(task, request, finalListener);
190+
final TransformNodeAssignments transformNodeAssignments = TransformNodes.transformTaskNodes(hitsAndIds.v2(), state);
191+
192+
final ActionListener<Response> doExecuteListener;
193+
if (transformNodeAssignments.getWaitingForAssignment().size() > 0) {
194+
doExecuteListener = cancelTransformTasksWithNoAssignment(finalListener, transformNodeAssignments);
195+
} else {
196+
doExecuteListener = finalListener;
197+
}
198+
199+
if (transformNodeAssignments.getExecutorNodes().size() > 0) {
200+
request.setNodes(transformNodeAssignments.getExecutorNodes().toArray(new String[0]));
201+
super.doExecute(task, request, doExecuteListener);
202+
} else {
203+
doExecuteListener.onResponse(new Response(true));
204+
}
190205
}, e -> {
191206
if (e instanceof ResourceNotFoundException) {
192207
Tuple<Set<String>, Set<String>> runningTasksAndNodes = findTasksWithoutConfig(state, request.getId());
193208
if (runningTasksAndNodes.v1().isEmpty()) {
194209
listener.onFailure(e);
195210
// found transforms without a config
196211
} else if (request.isForce()) {
212+
// TODO: handle tasks waiting for assignment
197213
request.setExpandedIds(runningTasksAndNodes.v1());
198214
request.setNodes(runningTasksAndNodes.v2().toArray(new String[0]));
199215
super.doExecute(task, request, finalListener);
@@ -457,4 +473,31 @@ private void waitForTransformStopped(
457473
listener.onFailure(e);
458474
}));
459475
}
476+
477+
private ActionListener<Response> cancelTransformTasksWithNoAssignment(
478+
final ActionListener<Response> finalListener,
479+
final TransformNodeAssignments transformNodeAssignments
480+
) {
481+
final ActionListener<Response> doExecuteListener = ActionListener.wrap(response -> {
482+
GroupedActionListener<PersistentTask<?>> groupedListener = new GroupedActionListener<>(
483+
ActionListener.wrap(r -> { finalListener.onResponse(response); }, finalListener::onFailure),
484+
transformNodeAssignments.getWaitingForAssignment().size()
485+
);
486+
487+
for (String unassignedTaskId : transformNodeAssignments.getWaitingForAssignment()) {
488+
persistentTasksService.sendRemoveRequest(unassignedTaskId, groupedListener);
489+
}
490+
491+
}, e -> {
492+
GroupedActionListener<PersistentTask<?>> groupedListener = new GroupedActionListener<>(
493+
ActionListener.wrap(r -> { finalListener.onFailure(e); }, finalListener::onFailure),
494+
transformNodeAssignments.getWaitingForAssignment().size()
495+
);
496+
497+
for (String unassignedTaskId : transformNodeAssignments.getWaitingForAssignment()) {
498+
persistentTasksService.sendRemoveRequest(unassignedTaskId, groupedListener);
499+
}
500+
});
501+
return doExecuteListener;
502+
}
460503
}

x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/action/TransformNodesTests.java

+39-12
Original file line numberDiff line numberDiff line change
@@ -21,8 +21,6 @@
2121

2222
import java.util.Arrays;
2323
import java.util.Collections;
24-
import java.util.HashSet;
25-
import java.util.Set;
2624

2725
public class TransformNodesTests extends ESTestCase {
2826

@@ -32,6 +30,7 @@ public void testTransformNodes() {
3230
String transformIdFailed = "df-id-failed";
3331
String transformIdBaz = "df-id-baz";
3432
String transformIdOther = "df-id-other";
33+
String transformIdStopped = "df-id-stopped";
3534

3635
PersistentTasksCustomMetadata.Builder tasksBuilder = PersistentTasksCustomMetadata.builder();
3736
tasksBuilder.addTask(
@@ -91,21 +90,49 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) {
9190
.build();
9291

9392
// don't ask for transformIdOther
94-
String[] nodes = TransformNodes.transformTaskNodes(
95-
Arrays.asList(transformIdFoo, transformIdBar, transformIdFailed, transformIdBaz),
93+
TransformNodeAssignments transformNodeAssignments = TransformNodes.transformTaskNodes(
94+
Arrays.asList(transformIdFoo, transformIdBar, transformIdFailed, transformIdBaz, transformIdStopped),
9695
cs
9796
);
98-
assertEquals(2, nodes.length);
99-
Set<String> nodesSet = new HashSet<>(Arrays.asList(nodes));
100-
assertTrue(nodesSet.contains("node-1"));
101-
assertTrue(nodesSet.contains("node-2"));
102-
assertFalse(nodesSet.contains(null));
103-
assertFalse(nodesSet.contains("node-3"));
97+
assertEquals(2, transformNodeAssignments.getExecutorNodes().size());
98+
assertTrue(transformNodeAssignments.getExecutorNodes().contains("node-1"));
99+
assertTrue(transformNodeAssignments.getExecutorNodes().contains("node-2"));
100+
assertFalse(transformNodeAssignments.getExecutorNodes().contains(null));
101+
assertFalse(transformNodeAssignments.getExecutorNodes().contains("node-3"));
102+
assertEquals(1, transformNodeAssignments.getWaitingForAssignment().size());
103+
assertTrue(transformNodeAssignments.getWaitingForAssignment().contains(transformIdFailed));
104+
assertEquals(3, transformNodeAssignments.getAssigned().size());
105+
assertTrue(transformNodeAssignments.getAssigned().contains(transformIdFoo));
106+
assertTrue(transformNodeAssignments.getAssigned().contains(transformIdBar));
107+
assertTrue(transformNodeAssignments.getAssigned().contains(transformIdBaz));
108+
assertFalse(transformNodeAssignments.getAssigned().contains(transformIdFailed));
109+
assertEquals(1, transformNodeAssignments.getStopped().size());
110+
assertTrue(transformNodeAssignments.getStopped().contains(transformIdStopped));
111+
112+
transformNodeAssignments = TransformNodes.transformTaskNodes(
113+
Arrays.asList(transformIdFoo, transformIdFailed),
114+
cs
115+
);
116+
117+
assertEquals(1, transformNodeAssignments.getExecutorNodes().size());
118+
assertTrue(transformNodeAssignments.getExecutorNodes().contains("node-1"));
119+
assertEquals(1, transformNodeAssignments.getWaitingForAssignment().size());
120+
assertTrue(transformNodeAssignments.getWaitingForAssignment().contains(transformIdFailed));
121+
assertEquals(1, transformNodeAssignments.getAssigned().size());
122+
assertTrue(transformNodeAssignments.getAssigned().contains(transformIdFoo));
123+
assertFalse(transformNodeAssignments.getAssigned().contains(transformIdFailed));
124+
assertEquals(0, transformNodeAssignments.getStopped().size());
104125
}
105126

106127
public void testTransformNodes_NoTasks() {
107128
ClusterState emptyState = ClusterState.builder(new ClusterName("_name")).build();
108-
String[] nodes = TransformNodes.transformTaskNodes(Collections.singletonList("df-id"), emptyState);
109-
assertEquals(0, nodes.length);
129+
TransformNodeAssignments transformNodeAssignments = TransformNodes.transformTaskNodes(
130+
Collections.singletonList("df-id"),
131+
emptyState
132+
);
133+
134+
assertEquals(0, transformNodeAssignments.getExecutorNodes().size());
135+
assertEquals(1, transformNodeAssignments.getStopped().size());
136+
assertTrue(transformNodeAssignments.getStopped().contains("df-id"));
110137
}
111138
}

0 commit comments

Comments
 (0)