Skip to content

Commit a5204c1

Browse files
authored
[ML] Fixes for stop datafeed edge cases (#49284)
The following edge cases were fixed: 1. A request to force-stop a stopping datafeed is no longer ignored. Force-stop is an important recovery mechanism if normal stop doesn't work for some reason, and needs to operate on a datafeed in any state other than stopped. 2. If the node that a datafeed is running on is removed from the cluster during a normal stop then the stop request is retried (and will likely succeed on this retry by simply cancelling the persistent task for the affected datafeed). 3. If there are multiple simultaneous force-stop requests for the same datafeed we no longer fail the one that is processed second. The previous behaviour was wrong as stopping a stopped datafeed is not an error, so stopping a datafeed twice simultaneously should not be either. Backport of #49191
1 parent 9c00648 commit a5204c1

File tree

8 files changed

+284
-39
lines changed

8 files changed

+284
-39
lines changed

x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/MlTasks.java

+1
Original file line numberDiff line numberDiff line change
@@ -132,6 +132,7 @@ public static JobState getJobStateModifiedForReassignments(@Nullable PersistentT
132132

133133
public static DatafeedState getDatafeedState(String datafeedId, @Nullable PersistentTasksCustomMetaData tasks) {
134134
PersistentTasksCustomMetaData.PersistentTask<?> task = getDatafeedTask(datafeedId, tasks);
135+
// TODO: report (task != null && task.getState() == null) as STARTING in version 8, and fix side effects
135136
if (task != null && task.getState() != null) {
136137
return (DatafeedState) task.getState();
137138
} else {

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -576,7 +576,7 @@ public Collection<Object> createComponents(Client client, ClusterService cluster
576576
}
577577
} else {
578578
autodetectProcessFactory = (job, autodetectParams, executorService, onProcessCrash) ->
579-
new BlackHoleAutodetectProcess(job.getId());
579+
new BlackHoleAutodetectProcess(job.getId(), onProcessCrash);
580580
// factor of 1.0 makes renormalization a no-op
581581
normalizerProcessFactory = (jobId, quantilesState, bucketSpan, executorService) -> new MultiplyingNormalizerProcess(1.0);
582582
analyticsProcessFactory = (jobId, analyticsProcessConfig, state, executorService, onProcessCrash) -> null;

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportStopDatafeedAction.java

+53-22
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
2222
import org.elasticsearch.common.util.concurrent.AtomicArray;
2323
import org.elasticsearch.discovery.MasterNotDiscoveredException;
24+
import org.elasticsearch.persistent.PersistentTasksClusterService;
2425
import org.elasticsearch.persistent.PersistentTasksCustomMetaData;
2526
import org.elasticsearch.persistent.PersistentTasksService;
2627
import org.elasticsearch.tasks.Task;
@@ -34,6 +35,7 @@
3435
import org.elasticsearch.xpack.ml.datafeed.persistence.DatafeedConfigProvider;
3536

3637
import java.util.ArrayList;
38+
import java.util.Collection;
3739
import java.util.HashSet;
3840
import java.util.List;
3941
import java.util.Set;
@@ -68,32 +70,46 @@ public TransportStopDatafeedAction(TransportService transportService, ThreadPool
6870
* @param tasks Persistent task meta data
6971
* @param startedDatafeedIds Started datafeed ids are added to this list
7072
* @param stoppingDatafeedIds Stopping datafeed ids are added to this list
73+
* @param notStoppedDatafeedIds Datafeed ids are added to this list for all datafeeds that are not stopped
7174
*/
72-
static void sortDatafeedIdsByTaskState(Set<String> expandedDatafeedIds,
75+
static void sortDatafeedIdsByTaskState(Collection<String> expandedDatafeedIds,
7376
PersistentTasksCustomMetaData tasks,
7477
List<String> startedDatafeedIds,
75-
List<String> stoppingDatafeedIds) {
78+
List<String> stoppingDatafeedIds,
79+
List<String> notStoppedDatafeedIds) {
7680

7781
for (String expandedDatafeedId : expandedDatafeedIds) {
7882
addDatafeedTaskIdAccordingToState(expandedDatafeedId, MlTasks.getDatafeedState(expandedDatafeedId, tasks),
79-
startedDatafeedIds, stoppingDatafeedIds);
83+
startedDatafeedIds, stoppingDatafeedIds, notStoppedDatafeedIds);
8084
}
8185
}
8286

8387
private static void addDatafeedTaskIdAccordingToState(String datafeedId,
8488
DatafeedState datafeedState,
8589
List<String> startedDatafeedIds,
86-
List<String> stoppingDatafeedIds) {
90+
List<String> stoppingDatafeedIds,
91+
List<String> notStoppedDatafeedIds) {
8792
switch (datafeedState) {
93+
case STARTING:
94+
// The STARTING state is not used anywhere at present, so this should never happen.
95+
// At present datafeeds that have a persistent task that hasn't yet been assigned
96+
// a state are reported as STOPPED (which is not great). It could be considered a
97+
// breaking change to introduce the STARTING state though, so let's aim to do it in
98+
// version 8. Also consider treating STARTING like STARTED for stop API behaviour.
99+
notStoppedDatafeedIds.add(datafeedId);
100+
break;
88101
case STARTED:
89102
startedDatafeedIds.add(datafeedId);
103+
notStoppedDatafeedIds.add(datafeedId);
90104
break;
91105
case STOPPED:
92106
break;
93107
case STOPPING:
94108
stoppingDatafeedIds.add(datafeedId);
109+
notStoppedDatafeedIds.add(datafeedId);
95110
break;
96111
default:
112+
assert false : "Unexpected datafeed state " + datafeedState;
97113
break;
98114
}
99115
}
@@ -118,17 +134,18 @@ protected void doExecute(Task task, StopDatafeedAction.Request request, ActionLi
118134

119135
List<String> startedDatafeeds = new ArrayList<>();
120136
List<String> stoppingDatafeeds = new ArrayList<>();
121-
sortDatafeedIdsByTaskState(expandedIds, tasks, startedDatafeeds, stoppingDatafeeds);
137+
List<String> notStoppedDatafeeds = new ArrayList<>();
138+
sortDatafeedIdsByTaskState(expandedIds, tasks, startedDatafeeds, stoppingDatafeeds, notStoppedDatafeeds);
122139
if (startedDatafeeds.isEmpty() && stoppingDatafeeds.isEmpty()) {
123140
listener.onResponse(new StopDatafeedAction.Response(true));
124141
return;
125142
}
126143
request.setResolvedStartedDatafeedIds(startedDatafeeds.toArray(new String[startedDatafeeds.size()]));
127144

128145
if (request.isForce()) {
129-
forceStopDatafeed(request, listener, tasks, startedDatafeeds);
146+
forceStopDatafeed(request, listener, tasks, notStoppedDatafeeds);
130147
} else {
131-
normalStopDatafeed(task, request, listener, tasks, startedDatafeeds, stoppingDatafeeds);
148+
normalStopDatafeed(task, request, listener, tasks, nodes, startedDatafeeds, stoppingDatafeeds);
132149
}
133150
},
134151
listener::onFailure
@@ -137,20 +154,20 @@ protected void doExecute(Task task, StopDatafeedAction.Request request, ActionLi
137154
}
138155

139156
private void normalStopDatafeed(Task task, StopDatafeedAction.Request request, ActionListener<StopDatafeedAction.Response> listener,
140-
PersistentTasksCustomMetaData tasks,
157+
PersistentTasksCustomMetaData tasks, DiscoveryNodes nodes,
141158
List<String> startedDatafeeds, List<String> stoppingDatafeeds) {
142-
Set<String> executorNodes = new HashSet<>();
159+
final Set<String> executorNodes = new HashSet<>();
143160
for (String datafeedId : startedDatafeeds) {
144161
PersistentTasksCustomMetaData.PersistentTask<?> datafeedTask = MlTasks.getDatafeedTask(datafeedId, tasks);
145162
if (datafeedTask == null) {
146163
// This should not happen, because startedDatafeeds was derived from the same tasks that is passed to this method
147164
String msg = "Requested datafeed [" + datafeedId + "] be stopped, but datafeed's task could not be found.";
148165
assert datafeedTask != null : msg;
149166
logger.error(msg);
150-
} else if (datafeedTask.isAssigned()) {
167+
} else if (PersistentTasksClusterService.needsReassignment(datafeedTask.getAssignment(), nodes) == false) {
151168
executorNodes.add(datafeedTask.getExecutorNode());
152169
} else {
153-
// This is the easy case - the datafeed is not currently assigned to a node,
170+
// This is the easy case - the datafeed is not currently assigned to a valid node,
154171
// so can be gracefully stopped simply by removing its persistent task. (Usually
155172
// a graceful stop cannot be achieved by simply removing the persistent task, but
156173
// if the datafeed has no running code then graceful/forceful are the same.)
@@ -171,48 +188,62 @@ private void normalStopDatafeed(Task task, StopDatafeedAction.Request request, A
171188

172189
ActionListener<StopDatafeedAction.Response> finalListener = ActionListener.wrap(
173190
r -> waitForDatafeedStopped(allDataFeedsToWaitFor, request, r, listener),
174-
listener::onFailure);
191+
e -> {
192+
if (ExceptionsHelper.unwrapCause(e) instanceof FailedNodeException) {
193+
// A node has dropped out of the cluster since we started executing the requests.
194+
// Since stopping an already stopped datafeed is not an error we can try again.
195+
// The datafeeds that were running on the node that dropped out of the cluster
196+
// will just have their persistent tasks cancelled. Datafeeds that were stopped
197+
// by the previous attempt will be noops in the subsequent attempt.
198+
doExecute(task, request, listener);
199+
} else {
200+
listener.onFailure(e);
201+
}
202+
});
175203

176204
super.doExecute(task, request, finalListener);
177205
}
178206

179207
private void forceStopDatafeed(final StopDatafeedAction.Request request, final ActionListener<StopDatafeedAction.Response> listener,
180-
PersistentTasksCustomMetaData tasks, final List<String> startedDatafeeds) {
208+
PersistentTasksCustomMetaData tasks, final List<String> notStoppedDatafeeds) {
181209
final AtomicInteger counter = new AtomicInteger();
182-
final AtomicArray<Exception> failures = new AtomicArray<>(startedDatafeeds.size());
210+
final AtomicArray<Exception> failures = new AtomicArray<>(notStoppedDatafeeds.size());
183211

184-
for (String datafeedId : startedDatafeeds) {
212+
for (String datafeedId : notStoppedDatafeeds) {
185213
PersistentTasksCustomMetaData.PersistentTask<?> datafeedTask = MlTasks.getDatafeedTask(datafeedId, tasks);
186214
if (datafeedTask != null) {
187215
persistentTasksService.sendRemoveRequest(datafeedTask.getId(),
188216
new ActionListener<PersistentTasksCustomMetaData.PersistentTask<?>>() {
189217
@Override
190218
public void onResponse(PersistentTasksCustomMetaData.PersistentTask<?> persistentTask) {
191-
if (counter.incrementAndGet() == startedDatafeeds.size()) {
219+
if (counter.incrementAndGet() == notStoppedDatafeeds.size()) {
192220
sendResponseOrFailure(request.getDatafeedId(), listener, failures);
193221
}
194222
}
195223

196224
@Override
197225
public void onFailure(Exception e) {
198226
final int slot = counter.incrementAndGet();
199-
if ((ExceptionsHelper.unwrapCause(e) instanceof ResourceNotFoundException &&
200-
Strings.isAllOrWildcard(new String[]{request.getDatafeedId()})) == false) {
227+
// We validated that the datafeed names supplied in the request existed when we started processing the action.
228+
// If the related tasks don't exist at this point then they must have been stopped by a simultaneous stop request.
229+
// This is not an error.
230+
if (ExceptionsHelper.unwrapCause(e) instanceof ResourceNotFoundException == false) {
201231
failures.set(slot - 1, e);
202232
}
203-
if (slot == startedDatafeeds.size()) {
233+
if (slot == notStoppedDatafeeds.size()) {
204234
sendResponseOrFailure(request.getDatafeedId(), listener, failures);
205235
}
206236
}
207237
});
208238
} else {
209-
// This should not happen, because startedDatafeeds was derived from the same tasks that is passed to this method
239+
// This should not happen, because startedDatafeeds and stoppingDatafeeds
240+
// were derived from the same tasks that were passed to this method
210241
String msg = "Requested datafeed [" + datafeedId + "] be force-stopped, but datafeed's task could not be found.";
211242
assert datafeedTask != null : msg;
212243
logger.error(msg);
213244
final int slot = counter.incrementAndGet();
214245
failures.set(slot - 1, new RuntimeException(msg));
215-
if (slot == startedDatafeeds.size()) {
246+
if (slot == notStoppedDatafeeds.size()) {
216247
sendResponseOrFailure(request.getDatafeedId(), listener, failures);
217248
}
218249
}
@@ -313,7 +344,7 @@ protected StopDatafeedAction.Response newResponse(StopDatafeedAction.Request req
313344
.convertToElastic(failedNodeExceptions.get(0));
314345
} else {
315346
// This can happen we the actual task in the node no longer exists,
316-
// which means the datafeed(s) have already been closed.
347+
// which means the datafeed(s) have already been stopped.
317348
return new StopDatafeedAction.Response(true);
318349
}
319350
}

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/BlackHoleAutodetectProcess.java

+16-2
Original file line numberDiff line numberDiff line change
@@ -21,12 +21,15 @@
2121

2222
import java.io.IOException;
2323
import java.time.ZonedDateTime;
24+
import java.util.Arrays;
2425
import java.util.Date;
2526
import java.util.Iterator;
2627
import java.util.List;
28+
import java.util.Objects;
2729
import java.util.concurrent.BlockingQueue;
2830
import java.util.concurrent.LinkedBlockingDeque;
2931
import java.util.concurrent.TimeUnit;
32+
import java.util.function.Consumer;
3033

3134
/**
3235
* A placeholder class simulating the actions of the native Autodetect process.
@@ -37,16 +40,21 @@
3740
*/
3841
public class BlackHoleAutodetectProcess implements AutodetectProcess {
3942

43+
public static final String MAGIC_FAILURE_VALUE = "253402300799";
44+
public static final String MAGIC_FAILURE_VALUE_AS_DATE = "9999-12-31 23:59:59";
45+
4046
private static final String FLUSH_ID = "flush-1";
4147

4248
private final String jobId;
4349
private final ZonedDateTime startTime;
4450
private final BlockingQueue<AutodetectResult> results = new LinkedBlockingDeque<>();
51+
private final Consumer<String> onProcessCrash;
4552
private volatile boolean open = true;
4653

47-
public BlackHoleAutodetectProcess(String jobId) {
54+
public BlackHoleAutodetectProcess(String jobId, Consumer<String> onProcessCrash) {
4855
this.jobId = jobId;
4956
startTime = ZonedDateTime.now();
57+
this.onProcessCrash = Objects.requireNonNull(onProcessCrash);
5058
}
5159

5260
@Override
@@ -59,7 +67,13 @@ public boolean isReady() {
5967
}
6068

6169
@Override
62-
public void writeRecord(String[] record) throws IOException {
70+
public void writeRecord(String[] record) {
71+
if (Arrays.asList(record).contains(MAGIC_FAILURE_VALUE)) {
72+
open = false;
73+
onProcessCrash.accept("simulated failure");
74+
AutodetectResult result = new AutodetectResult(null, null, null, null, null, null, null, null, null, null, null);
75+
results.add(result);
76+
}
6377
}
6478

6579
@Override

x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/action/TransportStopDatafeedActionTests.java

+10-5
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,6 @@
1414
import java.util.ArrayList;
1515
import java.util.Arrays;
1616
import java.util.Collections;
17-
import java.util.HashSet;
1817
import java.util.List;
1918

2019
public class TransportStopDatafeedActionTests extends ESTestCase {
@@ -27,17 +26,21 @@ public void testSortDatafeedIdsByTaskState_GivenDatafeedId() {
2726

2827
List<String> startedDatafeeds = new ArrayList<>();
2928
List<String> stoppingDatafeeds = new ArrayList<>();
29+
List<String> notStoppedDatafeeds = new ArrayList<>();
3030
TransportStopDatafeedAction.sortDatafeedIdsByTaskState(
31-
Collections.singleton("datafeed_1"), tasks, startedDatafeeds, stoppingDatafeeds);
31+
Collections.singleton("datafeed_1"), tasks, startedDatafeeds, stoppingDatafeeds, notStoppedDatafeeds);
3232
assertEquals(Collections.singletonList("datafeed_1"), startedDatafeeds);
3333
assertEquals(Collections.emptyList(), stoppingDatafeeds);
34+
assertEquals(Collections.singletonList("datafeed_1"), notStoppedDatafeeds);
3435

3536
startedDatafeeds.clear();
3637
stoppingDatafeeds.clear();
38+
notStoppedDatafeeds.clear();
3739
TransportStopDatafeedAction.sortDatafeedIdsByTaskState(
38-
Collections.singleton("datafeed_2"), tasks, startedDatafeeds, stoppingDatafeeds);
40+
Collections.singleton("datafeed_2"), tasks, startedDatafeeds, stoppingDatafeeds, notStoppedDatafeeds);
3941
assertEquals(Collections.emptyList(), startedDatafeeds);
4042
assertEquals(Collections.emptyList(), stoppingDatafeeds);
43+
assertEquals(Collections.emptyList(), notStoppedDatafeeds);
4144
}
4245

4346
public void testSortDatafeedIdsByTaskState_GivenAll() {
@@ -50,15 +53,17 @@ public void testSortDatafeedIdsByTaskState_GivenAll() {
5053

5154
List<String> startedDatafeeds = new ArrayList<>();
5255
List<String> stoppingDatafeeds = new ArrayList<>();
56+
List<String> notStoppedDatafeeds = new ArrayList<>();
5357
TransportStopDatafeedAction.sortDatafeedIdsByTaskState(
54-
new HashSet<>(Arrays.asList("datafeed_1", "datafeed_2", "datafeed_3")), tasks, startedDatafeeds, stoppingDatafeeds);
58+
Arrays.asList("datafeed_1", "datafeed_2", "datafeed_3"), tasks, startedDatafeeds, stoppingDatafeeds, notStoppedDatafeeds);
5559
assertEquals(Collections.singletonList("datafeed_1"), startedDatafeeds);
5660
assertEquals(Collections.singletonList("datafeed_3"), stoppingDatafeeds);
61+
assertEquals(Arrays.asList("datafeed_1", "datafeed_3"), notStoppedDatafeeds);
5762

5863
startedDatafeeds.clear();
5964
stoppingDatafeeds.clear();
6065
TransportStopDatafeedAction.sortDatafeedIdsByTaskState(Collections.singleton("datafeed_2"), tasks, startedDatafeeds,
61-
stoppingDatafeeds);
66+
stoppingDatafeeds, notStoppedDatafeeds);
6267
assertEquals(Collections.emptyList(), startedDatafeeds);
6368
assertEquals(Collections.emptyList(), stoppingDatafeeds);
6469
}

0 commit comments

Comments
 (0)