Skip to content

Commit 1d9096f

Browse files
Preserve Task Id for ML Datafeed (#54943)
This change preserves the task id for internal requests for the `StartDatafeedPersistentTask`. Task ids are a way to express a relationship between related internal requests. In this particular case, the task ids are used for debugging and (soon) security auditing, but not for task cancellation, because there is already a graceful-shutdown of child internal requests (given a task id) in place.
1 parent 86d1268 commit 1d9096f

File tree

5 files changed

+23
-13
lines changed

5 files changed

+23
-13
lines changed

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportStartDatafeedAction.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -460,6 +460,12 @@ protected void onCancelled() {
460460
stop(getReasonCancelled(), TimeValue.ZERO);
461461
}
462462

463+
@Override
464+
public boolean shouldCancelChildrenOnCancellation() {
465+
// onCancelled implements graceful shutdown of children
466+
return false;
467+
}
468+
463469
public void stop(String reason, TimeValue timeout) {
464470
if (datafeedManager != null) {
465471
datafeedManager.stopDatafeed(this, reason, timeout);

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedJobBuilder.java

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -8,11 +8,13 @@
88
import org.elasticsearch.ResourceNotFoundException;
99
import org.elasticsearch.action.ActionListener;
1010
import org.elasticsearch.client.Client;
11+
import org.elasticsearch.client.ParentTaskAssigningClient;
1112
import org.elasticsearch.common.settings.Settings;
1213
import org.elasticsearch.common.unit.TimeValue;
1314
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
1415
import org.elasticsearch.license.RemoteClusterLicenseChecker;
1516
import org.elasticsearch.node.Node;
17+
import org.elasticsearch.tasks.TaskId;
1618
import org.elasticsearch.xpack.core.action.util.QueryPage;
1719
import org.elasticsearch.xpack.core.ml.annotations.AnnotationPersister;
1820
import org.elasticsearch.xpack.core.ml.datafeed.DatafeedConfig;
@@ -74,16 +76,17 @@ public DatafeedJobBuilder(Client client, NamedXContentRegistry xContentRegistry,
7476
this.nodeName = nodeName;
7577
}
7678

77-
void build(String datafeedId, ActionListener<DatafeedJob> listener) {
79+
void build(String datafeedId, TaskId parentTaskId, ActionListener<DatafeedJob> listener) {
7880
AtomicReference<Job> jobHolder = new AtomicReference<>();
7981
AtomicReference<DatafeedConfig> datafeedConfigHolder = new AtomicReference<>();
82+
final ParentTaskAssigningClient parentTaskAssigningClient = new ParentTaskAssigningClient(client, parentTaskId);
8083

8184
// Step 5. Build datafeed job object
8285
Consumer<Context> contextHanlder = context -> {
8386
TimeValue frequency = getFrequencyOrDefault(datafeedConfigHolder.get(), jobHolder.get(), xContentRegistry);
8487
TimeValue queryDelay = datafeedConfigHolder.get().getQueryDelay();
85-
DelayedDataDetector delayedDataDetector =
86-
DelayedDataDetectorFactory.buildDetector(jobHolder.get(), datafeedConfigHolder.get(), client, xContentRegistry);
88+
DelayedDataDetector delayedDataDetector = DelayedDataDetectorFactory.buildDetector(jobHolder.get(),
89+
datafeedConfigHolder.get(), parentTaskAssigningClient, xContentRegistry);
8790
DatafeedJob datafeedJob =
8891
new DatafeedJob(
8992
jobHolder.get().getId(),
@@ -92,7 +95,7 @@ void build(String datafeedId, ActionListener<DatafeedJob> listener) {
9295
queryDelay.millis(),
9396
context.dataExtractorFactory,
9497
context.timingStatsReporter,
95-
client,
98+
parentTaskAssigningClient,
9699
auditor,
97100
annotationPersister,
98101
currentTimeSupplier,
@@ -123,7 +126,7 @@ void build(String datafeedId, ActionListener<DatafeedJob> listener) {
123126
context.timingStatsReporter =
124127
new DatafeedTimingStatsReporter(initialTimingStats, jobResultsPersister::persistDatafeedTimingStats);
125128
DataExtractorFactory.create(
126-
client,
129+
parentTaskAssigningClient,
127130
datafeedConfigHolder.get(),
128131
jobHolder.get(),
129132
xContentRegistry,

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedManager.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -108,7 +108,7 @@ public void onFailure(Exception e) {
108108
}, finishHandler::accept
109109
);
110110

111-
datafeedJobBuilder.build(datafeedId, datafeedJobHandler);
111+
datafeedJobBuilder.build(datafeedId, task.getParentTaskId(), datafeedJobHandler);
112112
}
113113

114114
public void stopDatafeed(TransportStartDatafeedAction.DatafeedTask task, String reason, TimeValue timeout) {

x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/datafeed/DatafeedJobBuilderTests.java

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
import org.elasticsearch.common.util.concurrent.ThreadContext;
1313
import org.elasticsearch.mock.orig.Mockito;
1414
import org.elasticsearch.node.Node;
15+
import org.elasticsearch.tasks.TaskId;
1516
import org.elasticsearch.test.ESTestCase;
1617
import org.elasticsearch.threadpool.ThreadPool;
1718
import org.elasticsearch.xpack.core.action.util.QueryPage;
@@ -124,7 +125,7 @@ public void testBuild_GivenScrollDatafeedAndNewJob() throws Exception {
124125
givenJob(jobBuilder);
125126
givenDatafeed(datafeed);
126127

127-
datafeedJobBuilder.build("datafeed1", datafeedJobHandler);
128+
datafeedJobBuilder.build("datafeed1", new TaskId(""), datafeedJobHandler);
128129

129130
assertBusy(() -> wasHandlerCalled.get());
130131
}
@@ -152,7 +153,7 @@ public void testBuild_GivenScrollDatafeedAndOldJobWithLatestRecordTimestampAfter
152153
givenJob(jobBuilder);
153154
givenDatafeed(datafeed);
154155

155-
datafeedJobBuilder.build("datafeed1", datafeedJobHandler);
156+
datafeedJobBuilder.build("datafeed1", new TaskId(""), datafeedJobHandler);
156157

157158
assertBusy(() -> wasHandlerCalled.get());
158159
}
@@ -180,7 +181,7 @@ public void testBuild_GivenScrollDatafeedAndOldJobWithLatestBucketAfterLatestRec
180181
givenJob(jobBuilder);
181182
givenDatafeed(datafeed);
182183

183-
datafeedJobBuilder.build("datafeed1", datafeedJobHandler);
184+
datafeedJobBuilder.build("datafeed1", new TaskId(""), datafeedJobHandler);
184185

185186
assertBusy(() -> wasHandlerCalled.get());
186187
}
@@ -205,7 +206,7 @@ public void testBuild_GivenBucketsRequestFails() {
205206
givenJob(jobBuilder);
206207
givenDatafeed(datafeed);
207208

208-
datafeedJobBuilder.build("datafeed1", ActionListener.wrap(datafeedJob -> fail(), taskHandler));
209+
datafeedJobBuilder.build("datafeed1", new TaskId(""), ActionListener.wrap(datafeedJob -> fail(), taskHandler));
209210

210211
verify(taskHandler).accept(error);
211212
}
@@ -247,7 +248,7 @@ public void testBuildGivenRemoteIndicesButNoRemoteSearching() throws Exception {
247248

248249
givenJob(jobBuilder);
249250
givenDatafeed(datafeed);
250-
datafeedJobBuilder.build("datafeed1", datafeedJobHandler);
251+
datafeedJobBuilder.build("datafeed1", new TaskId(""), datafeedJobHandler);
251252
assertBusy(() -> wasHandlerCalled.get());
252253
}
253254

x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/datafeed/DatafeedManagerTests.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -118,10 +118,10 @@ public void setUpTests() {
118118
DatafeedJobBuilder datafeedJobBuilder = mock(DatafeedJobBuilder.class);
119119
doAnswer(invocationOnMock -> {
120120
@SuppressWarnings("rawtypes")
121-
ActionListener listener = (ActionListener) invocationOnMock.getArguments()[1];
121+
ActionListener listener = (ActionListener) invocationOnMock.getArguments()[2];
122122
listener.onResponse(datafeedJob);
123123
return null;
124-
}).when(datafeedJobBuilder).build(any(), any());
124+
}).when(datafeedJobBuilder).build(any(), any(), any());
125125

126126
hasOpenAutodetectCommunicator = new AtomicBoolean(true);
127127
AutodetectProcessManager autodetectProcessManager = mock(AutodetectProcessManager.class);

0 commit comments

Comments
 (0)