Skip to content

Commit 1596a0f

Browse files
authored
[ML] Fixes for stop datafeed edge cases (#49286)
The following edge cases were fixed: 1. A request to force-stop a stopping datafeed is no longer ignored. Force-stop is an important recovery mechanism if normal stop doesn't work for some reason, and needs to operate on a datafeed in any state other than stopped. 2. If the node that a datafeed is running on is removed from the cluster during a normal stop then the stop request is retried (and will likely succeed on this retry by simply cancelling the persistent task for the affected datafeed). 3. If there are multiple simultaneous force-stop requests for the same datafeed we no longer fail the one that is processed second. The previous behaviour was wrong as stopping a stopped datafeed is not an error, so stopping a datafeed twice simultaneously should not be either. Backport of #49191
1 parent bcbb2c9 commit 1596a0f

File tree

8 files changed

+290
-45
lines changed

8 files changed

+290
-45
lines changed

x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/MlTasks.java

+1
Original file line numberDiff line numberDiff line change
@@ -132,6 +132,7 @@ public static JobState getJobStateModifiedForReassignments(@Nullable PersistentT
132132

133133
public static DatafeedState getDatafeedState(String datafeedId, @Nullable PersistentTasksCustomMetaData tasks) {
134134
PersistentTasksCustomMetaData.PersistentTask<?> task = getDatafeedTask(datafeedId, tasks);
135+
// TODO: report (task != null && task.getState() == null) as STARTING in version 8, and fix side effects
135136
if (task != null && task.getState() != null) {
136137
return (DatafeedState) task.getState();
137138
} else {

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -536,7 +536,7 @@ public Collection<Object> createComponents(Client client, ClusterService cluster
536536
}
537537
} else {
538538
autodetectProcessFactory = (job, autodetectParams, executorService, onProcessCrash) ->
539-
new BlackHoleAutodetectProcess(job.getId());
539+
new BlackHoleAutodetectProcess(job.getId(), onProcessCrash);
540540
// factor of 1.0 makes renormalization a no-op
541541
normalizerProcessFactory = (jobId, quantilesState, bucketSpan, executorService) -> new MultiplyingNormalizerProcess(1.0);
542542
analyticsProcessFactory = (jobId, analyticsProcessConfig, state, executorService, onProcessCrash) -> null;

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportStopDatafeedAction.java

+59-28
Original file line numberDiff line numberDiff line change
@@ -16,11 +16,11 @@
1616
import org.elasticsearch.cluster.ClusterState;
1717
import org.elasticsearch.cluster.node.DiscoveryNodes;
1818
import org.elasticsearch.cluster.service.ClusterService;
19-
import org.elasticsearch.common.Strings;
2019
import org.elasticsearch.common.inject.Inject;
2120
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
2221
import org.elasticsearch.common.util.concurrent.AtomicArray;
2322
import org.elasticsearch.discovery.MasterNotDiscoveredException;
23+
import org.elasticsearch.persistent.PersistentTasksClusterService;
2424
import org.elasticsearch.persistent.PersistentTasksCustomMetaData;
2525
import org.elasticsearch.persistent.PersistentTasksService;
2626
import org.elasticsearch.tasks.Task;
@@ -34,6 +34,7 @@
3434
import org.elasticsearch.xpack.ml.datafeed.persistence.DatafeedConfigProvider;
3535

3636
import java.util.ArrayList;
37+
import java.util.Collection;
3738
import java.util.HashSet;
3839
import java.util.List;
3940
import java.util.Set;
@@ -68,32 +69,46 @@ public TransportStopDatafeedAction(TransportService transportService, ThreadPool
6869
* @param tasks Persistent task meta data
6970
* @param startedDatafeedIds Started datafeed ids are added to this list
7071
* @param stoppingDatafeedIds Stopping datafeed ids are added to this list
72+
* @param notStoppedDatafeedIds Datafeed ids are added to this list for all datafeeds that are not stopped
7173
*/
72-
static void sortDatafeedIdsByTaskState(Set<String> expandedDatafeedIds,
74+
static void sortDatafeedIdsByTaskState(Collection<String> expandedDatafeedIds,
7375
PersistentTasksCustomMetaData tasks,
7476
List<String> startedDatafeedIds,
75-
List<String> stoppingDatafeedIds) {
77+
List<String> stoppingDatafeedIds,
78+
List<String> notStoppedDatafeedIds) {
7679

7780
for (String expandedDatafeedId : expandedDatafeedIds) {
7881
addDatafeedTaskIdAccordingToState(expandedDatafeedId, MlTasks.getDatafeedState(expandedDatafeedId, tasks),
79-
startedDatafeedIds, stoppingDatafeedIds);
82+
startedDatafeedIds, stoppingDatafeedIds, notStoppedDatafeedIds);
8083
}
8184
}
8285

8386
private static void addDatafeedTaskIdAccordingToState(String datafeedId,
8487
DatafeedState datafeedState,
8588
List<String> startedDatafeedIds,
86-
List<String> stoppingDatafeedIds) {
89+
List<String> stoppingDatafeedIds,
90+
List<String> notStoppedDatafeedIds) {
8791
switch (datafeedState) {
92+
case STARTING:
93+
// The STARTING state is not used anywhere at present, so this should never happen.
94+
// At present datafeeds that have a persistent task that hasn't yet been assigned
95+
// a state are reported as STOPPED (which is not great). It could be considered a
96+
// breaking change to introduce the STARTING state though, so let's aim to do it in
97+
// version 8. Also consider treating STARTING like STARTED for stop API behaviour.
98+
notStoppedDatafeedIds.add(datafeedId);
99+
break;
88100
case STARTED:
89101
startedDatafeedIds.add(datafeedId);
102+
notStoppedDatafeedIds.add(datafeedId);
90103
break;
91104
case STOPPED:
92105
break;
93106
case STOPPING:
94107
stoppingDatafeedIds.add(datafeedId);
108+
notStoppedDatafeedIds.add(datafeedId);
95109
break;
96110
default:
111+
assert false : "Unexpected datafeed state " + datafeedState;
97112
break;
98113
}
99114
}
@@ -118,17 +133,18 @@ protected void doExecute(Task task, StopDatafeedAction.Request request, ActionLi
118133

119134
List<String> startedDatafeeds = new ArrayList<>();
120135
List<String> stoppingDatafeeds = new ArrayList<>();
121-
sortDatafeedIdsByTaskState(expandedIds, tasks, startedDatafeeds, stoppingDatafeeds);
136+
List<String> notStoppedDatafeeds = new ArrayList<>();
137+
sortDatafeedIdsByTaskState(expandedIds, tasks, startedDatafeeds, stoppingDatafeeds, notStoppedDatafeeds);
122138
if (startedDatafeeds.isEmpty() && stoppingDatafeeds.isEmpty()) {
123139
listener.onResponse(new StopDatafeedAction.Response(true));
124140
return;
125141
}
126142
request.setResolvedStartedDatafeedIds(startedDatafeeds.toArray(new String[startedDatafeeds.size()]));
127143

128144
if (request.isForce()) {
129-
forceStopDatafeed(request, listener, tasks, startedDatafeeds);
145+
forceStopDatafeed(request, listener, tasks, notStoppedDatafeeds);
130146
} else {
131-
normalStopDatafeed(task, request, listener, tasks, startedDatafeeds, stoppingDatafeeds);
147+
normalStopDatafeed(task, request, listener, tasks, nodes, startedDatafeeds, stoppingDatafeeds);
132148
}
133149
},
134150
listener::onFailure
@@ -137,20 +153,20 @@ protected void doExecute(Task task, StopDatafeedAction.Request request, ActionLi
137153
}
138154

139155
private void normalStopDatafeed(Task task, StopDatafeedAction.Request request, ActionListener<StopDatafeedAction.Response> listener,
140-
PersistentTasksCustomMetaData tasks,
156+
PersistentTasksCustomMetaData tasks, DiscoveryNodes nodes,
141157
List<String> startedDatafeeds, List<String> stoppingDatafeeds) {
142-
Set<String> executorNodes = new HashSet<>();
158+
final Set<String> executorNodes = new HashSet<>();
143159
for (String datafeedId : startedDatafeeds) {
144160
PersistentTasksCustomMetaData.PersistentTask<?> datafeedTask = MlTasks.getDatafeedTask(datafeedId, tasks);
145161
if (datafeedTask == null) {
146162
// This should not happen, because startedDatafeeds was derived from the same tasks that is passed to this method
147163
String msg = "Requested datafeed [" + datafeedId + "] be stopped, but datafeed's task could not be found.";
148164
assert datafeedTask != null : msg;
149165
logger.error(msg);
150-
} else if (datafeedTask.isAssigned()) {
166+
} else if (PersistentTasksClusterService.needsReassignment(datafeedTask.getAssignment(), nodes) == false) {
151167
executorNodes.add(datafeedTask.getExecutorNode());
152168
} else {
153-
// This is the easy case - the datafeed is not currently assigned to a node,
169+
// This is the easy case - the datafeed is not currently assigned to a valid node,
154170
// so can be gracefully stopped simply by removing its persistent task. (Usually
155171
// a graceful stop cannot be achieved by simply removing the persistent task, but
156172
// if the datafeed has no running code then graceful/forceful are the same.)
@@ -171,48 +187,62 @@ private void normalStopDatafeed(Task task, StopDatafeedAction.Request request, A
171187

172188
ActionListener<StopDatafeedAction.Response> finalListener = ActionListener.wrap(
173189
r -> waitForDatafeedStopped(allDataFeedsToWaitFor, request, r, listener),
174-
listener::onFailure);
190+
e -> {
191+
if (ExceptionsHelper.unwrapCause(e) instanceof FailedNodeException) {
192+
// A node has dropped out of the cluster since we started executing the requests.
193+
// Since stopping an already stopped datafeed is not an error we can try again.
194+
// The datafeeds that were running on the node that dropped out of the cluster
195+
// will just have their persistent tasks cancelled. Datafeeds that were stopped
196+
// by the previous attempt will be noops in the subsequent attempt.
197+
doExecute(task, request, listener);
198+
} else {
199+
listener.onFailure(e);
200+
}
201+
});
175202

176203
super.doExecute(task, request, finalListener);
177204
}
178205

179206
private void forceStopDatafeed(final StopDatafeedAction.Request request, final ActionListener<StopDatafeedAction.Response> listener,
180-
PersistentTasksCustomMetaData tasks, final List<String> startedDatafeeds) {
207+
PersistentTasksCustomMetaData tasks, final List<String> notStoppedDatafeeds) {
181208
final AtomicInteger counter = new AtomicInteger();
182-
final AtomicArray<Exception> failures = new AtomicArray<>(startedDatafeeds.size());
209+
final AtomicArray<Exception> failures = new AtomicArray<>(notStoppedDatafeeds.size());
183210

184-
for (String datafeedId : startedDatafeeds) {
211+
for (String datafeedId : notStoppedDatafeeds) {
185212
PersistentTasksCustomMetaData.PersistentTask<?> datafeedTask = MlTasks.getDatafeedTask(datafeedId, tasks);
186213
if (datafeedTask != null) {
187214
persistentTasksService.sendRemoveRequest(datafeedTask.getId(),
188215
new ActionListener<PersistentTasksCustomMetaData.PersistentTask<?>>() {
189216
@Override
190217
public void onResponse(PersistentTasksCustomMetaData.PersistentTask<?> persistentTask) {
191-
if (counter.incrementAndGet() == startedDatafeeds.size()) {
218+
if (counter.incrementAndGet() == notStoppedDatafeeds.size()) {
192219
sendResponseOrFailure(request.getDatafeedId(), listener, failures);
193220
}
194221
}
195222

196223
@Override
197224
public void onFailure(Exception e) {
198225
final int slot = counter.incrementAndGet();
199-
if ((ExceptionsHelper.unwrapCause(e) instanceof ResourceNotFoundException &&
200-
Strings.isAllOrWildcard(new String[]{request.getDatafeedId()})) == false) {
226+
// We validated that the datafeed names supplied in the request existed when we started processing the action.
227+
// If the related tasks don't exist at this point then they must have been stopped by a simultaneous stop request.
228+
// This is not an error.
229+
if (ExceptionsHelper.unwrapCause(e) instanceof ResourceNotFoundException == false) {
201230
failures.set(slot - 1, e);
202231
}
203-
if (slot == startedDatafeeds.size()) {
232+
if (slot == notStoppedDatafeeds.size()) {
204233
sendResponseOrFailure(request.getDatafeedId(), listener, failures);
205234
}
206235
}
207236
});
208237
} else {
209-
// This should not happen, because startedDatafeeds was derived from the same tasks that is passed to this method
238+
// This should not happen, because startedDatafeeds and stoppingDatafeeds
239+
// were derived from the same tasks that were passed to this method
210240
String msg = "Requested datafeed [" + datafeedId + "] be force-stopped, but datafeed's task could not be found.";
211241
assert datafeedTask != null : msg;
212242
logger.error(msg);
213243
final int slot = counter.incrementAndGet();
214244
failures.set(slot - 1, new RuntimeException(msg));
215-
if (slot == startedDatafeeds.size()) {
245+
if (slot == notStoppedDatafeeds.size()) {
216246
sendResponseOrFailure(request.getDatafeedId(), listener, failures);
217247
}
218248
}
@@ -228,17 +258,18 @@ protected void taskOperation(StopDatafeedAction.Request request, TransportStartD
228258
threadPool.executor(MachineLearning.UTILITY_THREAD_POOL_NAME).execute(new AbstractRunnable() {
229259
@Override
230260
public void onFailure(Exception e) {
231-
if ((e instanceof ResourceNotFoundException &&
232-
Strings.isAllOrWildcard(new String[]{request.getDatafeedId()}))) {
233-
datafeedTask.stop("stop_datafeed (api)", request.getStopTimeout());
261+
// We validated that the datafeed names supplied in the request existed when we started processing the action.
262+
// If the related task for one of them doesn't exist at this point then it must have been removed by a
263+
// simultaneous force stop request. This is not an error.
264+
if (ExceptionsHelper.unwrapCause(e) instanceof ResourceNotFoundException) {
234265
listener.onResponse(new StopDatafeedAction.Response(true));
235266
} else {
236267
listener.onFailure(e);
237268
}
238269
}
239270

240271
@Override
241-
protected void doRun() throws Exception {
272+
protected void doRun() {
242273
datafeedTask.stop("stop_datafeed (api)", request.getStopTimeout());
243274
listener.onResponse(new StopDatafeedAction.Response(true));
244275
}
@@ -312,8 +343,8 @@ protected StopDatafeedAction.Response newResponse(StopDatafeedAction.Request req
312343
throw org.elasticsearch.ExceptionsHelper
313344
.convertToElastic(failedNodeExceptions.get(0));
314345
} else {
315-
// This can happen we the actual task in the node no longer exists,
316-
// which means the datafeed(s) have already been closed.
346+
// This can happen when the actual task in the node no longer exists,
347+
// which means the datafeed(s) have already been stopped.
317348
return new StopDatafeedAction.Response(true);
318349
}
319350
}

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/BlackHoleAutodetectProcess.java

+16-2
Original file line numberDiff line numberDiff line change
@@ -21,12 +21,15 @@
2121

2222
import java.io.IOException;
2323
import java.time.ZonedDateTime;
24+
import java.util.Arrays;
2425
import java.util.Date;
2526
import java.util.Iterator;
2627
import java.util.List;
28+
import java.util.Objects;
2729
import java.util.concurrent.BlockingQueue;
2830
import java.util.concurrent.LinkedBlockingDeque;
2931
import java.util.concurrent.TimeUnit;
32+
import java.util.function.Consumer;
3033

3134
/**
3235
* A placeholder class simulating the actions of the native Autodetect process.
@@ -37,16 +40,21 @@
3740
*/
3841
public class BlackHoleAutodetectProcess implements AutodetectProcess {
3942

43+
public static final String MAGIC_FAILURE_VALUE = "253402300799";
44+
public static final String MAGIC_FAILURE_VALUE_AS_DATE = "9999-12-31 23:59:59";
45+
4046
private static final String FLUSH_ID = "flush-1";
4147

4248
private final String jobId;
4349
private final ZonedDateTime startTime;
4450
private final BlockingQueue<AutodetectResult> results = new LinkedBlockingDeque<>();
51+
private final Consumer<String> onProcessCrash;
4552
private volatile boolean open = true;
4653

47-
public BlackHoleAutodetectProcess(String jobId) {
54+
public BlackHoleAutodetectProcess(String jobId, Consumer<String> onProcessCrash) {
4855
this.jobId = jobId;
4956
startTime = ZonedDateTime.now();
57+
this.onProcessCrash = Objects.requireNonNull(onProcessCrash);
5058
}
5159

5260
@Override
@@ -59,7 +67,13 @@ public boolean isReady() {
5967
}
6068

6169
@Override
62-
public void writeRecord(String[] record) throws IOException {
70+
public void writeRecord(String[] record) {
71+
if (Arrays.asList(record).contains(MAGIC_FAILURE_VALUE)) {
72+
open = false;
73+
onProcessCrash.accept("simulated failure");
74+
AutodetectResult result = new AutodetectResult(null, null, null, null, null, null, null, null, null, null, null);
75+
results.add(result);
76+
}
6377
}
6478

6579
@Override

0 commit comments

Comments
 (0)