Skip to content

Commit 546a3b8

Browse files
authored
[ML] Fixes for stop datafeed edge cases (#49290)
The following edge cases were fixed: 1. A request to force-stop a stopping datafeed is no longer ignored. Force-stop is an important recovery mechanism if normal stop doesn't work for some reason, and needs to operate on a datafeed in any state other than stopped. 2. If the node that a datafeed is running on is removed from the cluster during a normal stop then the stop request is retried (and will likely succeed on this retry by simply cancelling the persistent task for the affected datafeed). 3. If there are multiple simultaneous force-stop requests for the same datafeed we no longer fail the one that is processed second. The previous behaviour was wrong as stopping a stopped datafeed is not an error, so stopping a datafeed twice simultaneously should not be either. Backport of #49191
1 parent d69c4e1 commit 546a3b8

File tree

9 files changed

+294
-45
lines changed

9 files changed

+294
-45
lines changed

x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/MlTasks.java

+1
Original file line numberDiff line numberDiff line change
@@ -111,6 +111,7 @@ public static JobState getJobStateModifiedForReassignments(@Nullable PersistentT
111111

112112
public static DatafeedState getDatafeedState(String datafeedId, @Nullable PersistentTasksCustomMetaData tasks) {
113113
PersistentTasksCustomMetaData.PersistentTask<?> task = getDatafeedTask(datafeedId, tasks);
114+
// TODO: report (task != null && task.getState() == null) as STARTING in version 8, and fix side effects
114115
if (task != null && task.getState() != null) {
115116
return (DatafeedState) task.getState();
116117
} else {

x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/utils/ExceptionsHelper.java

+4
Original file line numberDiff line numberDiff line change
@@ -86,4 +86,8 @@ public static <T> T requireNonNull(T obj, String paramName) {
8686
}
8787
return obj;
8888
}
89+
90+
public static Throwable unwrapCause(Throwable t) {
91+
return org.elasticsearch.ExceptionsHelper.unwrapCause(t);
92+
}
8993
}

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -441,7 +441,7 @@ public Collection<Object> createComponents(Client client, ClusterService cluster
441441
}
442442
} else {
443443
autodetectProcessFactory = (job, autodetectParams, executorService, onProcessCrash) ->
444-
new BlackHoleAutodetectProcess(job.getId());
444+
new BlackHoleAutodetectProcess(job.getId(), onProcessCrash);
445445
// factor of 1.0 makes renormalization a no-op
446446
normalizerProcessFactory = (jobId, quantilesState, bucketSpan, executorService) -> new MultiplyingNormalizerProcess(1.0);
447447
}

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportStopDatafeedAction.java

+60-28
Original file line numberDiff line numberDiff line change
@@ -17,13 +17,13 @@
1717
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
1818
import org.elasticsearch.cluster.node.DiscoveryNodes;
1919
import org.elasticsearch.cluster.service.ClusterService;
20-
import org.elasticsearch.common.Strings;
2120
import org.elasticsearch.common.inject.Inject;
2221
import org.elasticsearch.common.io.stream.StreamInput;
2322
import org.elasticsearch.common.settings.Settings;
2423
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
2524
import org.elasticsearch.common.util.concurrent.AtomicArray;
2625
import org.elasticsearch.discovery.MasterNotDiscoveredException;
26+
import org.elasticsearch.persistent.PersistentTasksClusterService;
2727
import org.elasticsearch.persistent.PersistentTasksCustomMetaData;
2828
import org.elasticsearch.persistent.PersistentTasksService;
2929
import org.elasticsearch.tasks.Task;
@@ -32,12 +32,14 @@
3232
import org.elasticsearch.xpack.core.ml.MlTasks;
3333
import org.elasticsearch.xpack.core.ml.action.StopDatafeedAction;
3434
import org.elasticsearch.xpack.core.ml.datafeed.DatafeedState;
35+
import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper;
3536
import org.elasticsearch.xpack.ml.MachineLearning;
3637
import org.elasticsearch.xpack.ml.datafeed.DatafeedConfigReader;
3738
import org.elasticsearch.xpack.ml.datafeed.persistence.DatafeedConfigProvider;
3839

3940
import java.io.IOException;
4041
import java.util.ArrayList;
42+
import java.util.Collection;
4143
import java.util.HashSet;
4244
import java.util.List;
4345
import java.util.Set;
@@ -72,32 +74,46 @@ public TransportStopDatafeedAction(Settings settings, TransportService transport
7274
* @param tasks Persistent task meta data
7375
* @param startedDatafeedIds Started datafeed ids are added to this list
7476
* @param stoppingDatafeedIds Stopping datafeed ids are added to this list
77+
* @param notStoppedDatafeedIds Datafeed ids are added to this list for all datafeeds that are not stopped
7578
*/
76-
static void sortDatafeedIdsByTaskState(Set<String> expandedDatafeedIds,
79+
static void sortDatafeedIdsByTaskState(Collection<String> expandedDatafeedIds,
7780
PersistentTasksCustomMetaData tasks,
7881
List<String> startedDatafeedIds,
79-
List<String> stoppingDatafeedIds) {
82+
List<String> stoppingDatafeedIds,
83+
List<String> notStoppedDatafeedIds) {
8084

8185
for (String expandedDatafeedId : expandedDatafeedIds) {
8286
addDatafeedTaskIdAccordingToState(expandedDatafeedId, MlTasks.getDatafeedState(expandedDatafeedId, tasks),
83-
startedDatafeedIds, stoppingDatafeedIds);
87+
startedDatafeedIds, stoppingDatafeedIds, notStoppedDatafeedIds);
8488
}
8589
}
8690

8791
private static void addDatafeedTaskIdAccordingToState(String datafeedId,
8892
DatafeedState datafeedState,
8993
List<String> startedDatafeedIds,
90-
List<String> stoppingDatafeedIds) {
94+
List<String> stoppingDatafeedIds,
95+
List<String> notStoppedDatafeedIds) {
9196
switch (datafeedState) {
97+
case STARTING:
98+
// The STARTING state is not used anywhere at present, so this should never happen.
99+
// At present datafeeds that have a persistent task that hasn't yet been assigned
100+
// a state are reported as STOPPED (which is not great). It could be considered a
101+
// breaking change to introduce the STARTING state though, so let's aim to do it in
102+
// version 8. Also consider treating STARTING like STARTED for stop API behaviour.
103+
notStoppedDatafeedIds.add(datafeedId);
104+
break;
92105
case STARTED:
93106
startedDatafeedIds.add(datafeedId);
107+
notStoppedDatafeedIds.add(datafeedId);
94108
break;
95109
case STOPPED:
96110
break;
97111
case STOPPING:
98112
stoppingDatafeedIds.add(datafeedId);
113+
notStoppedDatafeedIds.add(datafeedId);
99114
break;
100115
default:
116+
assert false : "Unexpected datafeed state " + datafeedState;
101117
break;
102118
}
103119
}
@@ -123,17 +139,18 @@ protected void doExecute(Task task, StopDatafeedAction.Request request, ActionLi
123139

124140
List<String> startedDatafeeds = new ArrayList<>();
125141
List<String> stoppingDatafeeds = new ArrayList<>();
126-
sortDatafeedIdsByTaskState(expandedIds, tasks, startedDatafeeds, stoppingDatafeeds);
142+
List<String> notStoppedDatafeeds = new ArrayList<>();
143+
sortDatafeedIdsByTaskState(expandedIds, tasks, startedDatafeeds, stoppingDatafeeds, notStoppedDatafeeds);
127144
if (startedDatafeeds.isEmpty() && stoppingDatafeeds.isEmpty()) {
128145
listener.onResponse(new StopDatafeedAction.Response(true));
129146
return;
130147
}
131148
request.setResolvedStartedDatafeedIds(startedDatafeeds.toArray(new String[startedDatafeeds.size()]));
132149

133150
if (request.isForce()) {
134-
forceStopDatafeed(request, listener, tasks, startedDatafeeds);
151+
forceStopDatafeed(request, listener, tasks, notStoppedDatafeeds);
135152
} else {
136-
normalStopDatafeed(task, request, listener, tasks, startedDatafeeds, stoppingDatafeeds);
153+
normalStopDatafeed(task, request, listener, tasks, nodes, startedDatafeeds, stoppingDatafeeds);
137154
}
138155
},
139156
listener::onFailure
@@ -142,20 +159,20 @@ protected void doExecute(Task task, StopDatafeedAction.Request request, ActionLi
142159
}
143160

144161
private void normalStopDatafeed(Task task, StopDatafeedAction.Request request, ActionListener<StopDatafeedAction.Response> listener,
145-
PersistentTasksCustomMetaData tasks,
162+
PersistentTasksCustomMetaData tasks, DiscoveryNodes nodes,
146163
List<String> startedDatafeeds, List<String> stoppingDatafeeds) {
147-
Set<String> executorNodes = new HashSet<>();
164+
final Set<String> executorNodes = new HashSet<>();
148165
for (String datafeedId : startedDatafeeds) {
149166
PersistentTasksCustomMetaData.PersistentTask<?> datafeedTask = MlTasks.getDatafeedTask(datafeedId, tasks);
150167
if (datafeedTask == null) {
151168
// This should not happen, because startedDatafeeds was derived from the same tasks that is passed to this method
152169
String msg = "Requested datafeed [" + datafeedId + "] be stopped, but datafeed's task could not be found.";
153170
assert datafeedTask != null : msg;
154171
logger.error(msg);
155-
} else if (datafeedTask.isAssigned()) {
172+
} else if (PersistentTasksClusterService.needsReassignment(datafeedTask.getAssignment(), nodes) == false) {
156173
executorNodes.add(datafeedTask.getExecutorNode());
157174
} else {
158-
// This is the easy case - the datafeed is not currently assigned to a node,
175+
// This is the easy case - the datafeed is not currently assigned to a valid node,
159176
// so can be gracefully stopped simply by removing its persistent task. (Usually
160177
// a graceful stop cannot be achieved by simply removing the persistent task, but
161178
// if the datafeed has no running code then graceful/forceful are the same.)
@@ -176,48 +193,62 @@ private void normalStopDatafeed(Task task, StopDatafeedAction.Request request, A
176193

177194
ActionListener<StopDatafeedAction.Response> finalListener = ActionListener.wrap(
178195
r -> waitForDatafeedStopped(allDataFeedsToWaitFor, request, r, listener),
179-
listener::onFailure);
196+
e -> {
197+
if (ExceptionsHelper.unwrapCause(e) instanceof FailedNodeException) {
198+
// A node has dropped out of the cluster since we started executing the requests.
199+
// Since stopping an already stopped datafeed is not an error we can try again.
200+
// The datafeeds that were running on the node that dropped out of the cluster
201+
// will just have their persistent tasks cancelled. Datafeeds that were stopped
202+
// by the previous attempt will be noops in the subsequent attempt.
203+
doExecute(task, request, listener);
204+
} else {
205+
listener.onFailure(e);
206+
}
207+
});
180208

181209
super.doExecute(task, request, finalListener);
182210
}
183211

184212
private void forceStopDatafeed(final StopDatafeedAction.Request request, final ActionListener<StopDatafeedAction.Response> listener,
185-
PersistentTasksCustomMetaData tasks, final List<String> startedDatafeeds) {
213+
PersistentTasksCustomMetaData tasks, final List<String> notStoppedDatafeeds) {
186214
final AtomicInteger counter = new AtomicInteger();
187-
final AtomicArray<Exception> failures = new AtomicArray<>(startedDatafeeds.size());
215+
final AtomicArray<Exception> failures = new AtomicArray<>(notStoppedDatafeeds.size());
188216

189-
for (String datafeedId : startedDatafeeds) {
217+
for (String datafeedId : notStoppedDatafeeds) {
190218
PersistentTasksCustomMetaData.PersistentTask<?> datafeedTask = MlTasks.getDatafeedTask(datafeedId, tasks);
191219
if (datafeedTask != null) {
192220
persistentTasksService.sendRemoveRequest(datafeedTask.getId(),
193221
new ActionListener<PersistentTasksCustomMetaData.PersistentTask<?>>() {
194222
@Override
195223
public void onResponse(PersistentTasksCustomMetaData.PersistentTask<?> persistentTask) {
196-
if (counter.incrementAndGet() == startedDatafeeds.size()) {
224+
if (counter.incrementAndGet() == notStoppedDatafeeds.size()) {
197225
sendResponseOrFailure(request.getDatafeedId(), listener, failures);
198226
}
199227
}
200228

201229
@Override
202230
public void onFailure(Exception e) {
203231
final int slot = counter.incrementAndGet();
204-
if ((e instanceof ResourceNotFoundException &&
205-
Strings.isAllOrWildcard(new String[]{request.getDatafeedId()})) == false) {
232+
// We validated that the datafeed names supplied in the request existed when we started processing the action.
233+
// If the related tasks don't exist at this point then they must have been stopped by a simultaneous stop request.
234+
// This is not an error.
235+
if (ExceptionsHelper.unwrapCause(e) instanceof ResourceNotFoundException == false) {
206236
failures.set(slot - 1, e);
207237
}
208-
if (slot == startedDatafeeds.size()) {
238+
if (slot == notStoppedDatafeeds.size()) {
209239
sendResponseOrFailure(request.getDatafeedId(), listener, failures);
210240
}
211241
}
212242
});
213243
} else {
214-
// This should not happen, because startedDatafeeds was derived from the same tasks that is passed to this method
244+
// This should not happen, because startedDatafeeds and stoppingDatafeeds
245+
// were derived from the same tasks that were passed to this method
215246
String msg = "Requested datafeed [" + datafeedId + "] be force-stopped, but datafeed's task could not be found.";
216247
assert datafeedTask != null : msg;
217248
logger.error(msg);
218249
final int slot = counter.incrementAndGet();
219250
failures.set(slot - 1, new RuntimeException(msg));
220-
if (slot == startedDatafeeds.size()) {
251+
if (slot == notStoppedDatafeeds.size()) {
221252
sendResponseOrFailure(request.getDatafeedId(), listener, failures);
222253
}
223254
}
@@ -233,24 +264,25 @@ protected void taskOperation(StopDatafeedAction.Request request, TransportStartD
233264
threadPool.executor(MachineLearning.UTILITY_THREAD_POOL_NAME).execute(new AbstractRunnable() {
234265
@Override
235266
public void onFailure(Exception e) {
236-
if ((e instanceof ResourceNotFoundException &&
237-
Strings.isAllOrWildcard(new String[]{request.getDatafeedId()}))) {
238-
datafeedTask.stop("stop_datafeed (api)", request.getStopTimeout());
267+
// We validated that the datafeed names supplied in the request existed when we started processing the action.
268+
// If the related task for one of them doesn't exist at this point then it must have been removed by a
269+
// simultaneous force stop request. This is not an error.
270+
if (ExceptionsHelper.unwrapCause(e) instanceof ResourceNotFoundException) {
239271
listener.onResponse(new StopDatafeedAction.Response(true));
240272
} else {
241273
listener.onFailure(e);
242274
}
243275
}
244276

245277
@Override
246-
protected void doRun() throws Exception {
278+
protected void doRun() {
247279
datafeedTask.stop("stop_datafeed (api)", request.getStopTimeout());
248280
listener.onResponse(new StopDatafeedAction.Response(true));
249281
}
250282
});
251283
},
252284
e -> {
253-
if (e instanceof ResourceNotFoundException) {
285+
if (ExceptionsHelper.unwrapCause(e) instanceof ResourceNotFoundException) {
254286
// the task has disappeared so must have stopped
255287
listener.onResponse(new StopDatafeedAction.Response(true));
256288
} else {
@@ -318,7 +350,7 @@ protected StopDatafeedAction.Response newResponse(StopDatafeedAction.Request req
318350
.convertToElastic(failedNodeExceptions.get(0));
319351
} else {
320352
// This can happen we the actual task in the node no longer exists,
321-
// which means the datafeed(s) have already been closed.
353+
// which means the datafeed(s) have already been stopped.
322354
return new StopDatafeedAction.Response(true);
323355
}
324356
}

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/BlackHoleAutodetectProcess.java

+15-2
Original file line numberDiff line numberDiff line change
@@ -21,9 +21,11 @@
2121

2222
import java.io.IOException;
2323
import java.time.ZonedDateTime;
24+
import java.util.Arrays;
2425
import java.util.Date;
2526
import java.util.Iterator;
2627
import java.util.List;
28+
import java.util.Objects;
2729
import java.util.concurrent.BlockingQueue;
2830
import java.util.concurrent.LinkedBlockingDeque;
2931
import java.util.concurrent.TimeUnit;
@@ -37,16 +39,21 @@
3739
*/
3840
public class BlackHoleAutodetectProcess implements AutodetectProcess {
3941

42+
public static final String MAGIC_FAILURE_VALUE = "253402300799";
43+
public static final String MAGIC_FAILURE_VALUE_AS_DATE = "9999-12-31 23:59:59";
44+
4045
private static final String FLUSH_ID = "flush-1";
4146

4247
private final String jobId;
4348
private final ZonedDateTime startTime;
4449
private final BlockingQueue<AutodetectResult> results = new LinkedBlockingDeque<>();
50+
private final Runnable onProcessCrash;
4551
private volatile boolean open = true;
4652

47-
public BlackHoleAutodetectProcess(String jobId) {
53+
public BlackHoleAutodetectProcess(String jobId, Runnable onProcessCrash) {
4854
this.jobId = jobId;
4955
startTime = ZonedDateTime.now();
56+
this.onProcessCrash = Objects.requireNonNull(onProcessCrash);
5057
}
5158

5259
@Override
@@ -59,7 +66,13 @@ public boolean isReady() {
5966
}
6067

6168
@Override
62-
public void writeRecord(String[] record) throws IOException {
69+
public void writeRecord(String[] record) {
70+
if (Arrays.asList(record).contains(MAGIC_FAILURE_VALUE)) {
71+
open = false;
72+
onProcessCrash.run();
73+
AutodetectResult result = new AutodetectResult(null, null, null, null, null, null, null, null, null, null, null);
74+
results.add(result);
75+
}
6376
}
6477

6578
@Override

0 commit comments

Comments
 (0)