diff --git a/docs/reference/ml/anomaly-detection/apis/get-datafeed-stats.asciidoc b/docs/reference/ml/anomaly-detection/apis/get-datafeed-stats.asciidoc index 1ce265df2d29a..b932a42deaafd 100644 --- a/docs/reference/ml/anomaly-detection/apis/get-datafeed-stats.asciidoc +++ b/docs/reference/ml/anomaly-detection/apis/get-datafeed-stats.asciidoc @@ -24,7 +24,7 @@ Retrieves usage information for {dfeeds}. [[ml-get-datafeed-stats-prereqs]] == {api-prereq-title} -Requires the `monitor_ml` cluster privilege. This privilege is included in the +Requires the `monitor_ml` cluster privilege. This privilege is included in the `machine_learning_user` built-in role. [[ml-get-datafeed-stats-desc]] @@ -103,6 +103,24 @@ include::{es-repo-dir}/ml/ml-shared.asciidoc[tag=node-transport-address] ==== -- +`running_state`:: +(object) An object containing the running state for this {dfeed}. It is only +provided if the {dfeed} is started. ++ +-- +[%collapsible%open] +==== +`real_time_configured`::: +(boolean) Indicates if the {dfeed} is "real-time"; meaning that the {dfeed} +has no configured `end` time. + +`real_time_running`::: +(boolean) Indicates whether the {dfeed} has finished running on the available +past data. For {dfeeds} without a configured `end` time, this means that +the {dfeed} is now running on "real-time" data. +==== +-- + `state`:: (string) include::{es-repo-dir}/ml/ml-shared.asciidoc[tag=state-datafeed] diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/action/GetDatafeedRunningStateAction.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/action/GetDatafeedRunningStateAction.java new file mode 100644 index 0000000000000..999c17290747b --- /dev/null +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/action/GetDatafeedRunningStateAction.java @@ -0,0 +1,170 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ +package org.elasticsearch.xpack.core.ml.action; + +import org.elasticsearch.action.ActionType; +import org.elasticsearch.action.support.tasks.BaseTasksRequest; +import org.elasticsearch.action.support.tasks.BaseTasksResponse; +import org.elasticsearch.common.collect.MapBuilder; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.io.stream.Writeable; +import org.elasticsearch.common.xcontent.ToXContentObject; +import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.tasks.Task; +import org.elasticsearch.xpack.core.ml.MlTasks; + +import java.io.IOException; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Optional; +import java.util.Set; +import java.util.stream.Collectors; + + +/** + * Internal only action to get the current running state of a datafeed + */ +public class GetDatafeedRunningStateAction extends ActionType { + + public static final GetDatafeedRunningStateAction INSTANCE = new GetDatafeedRunningStateAction(); + public static final String NAME = "cluster:internal/xpack/ml/datafeed/running_state"; + + private GetDatafeedRunningStateAction() { + super(NAME, GetDatafeedRunningStateAction.Response::new); + } + + public static class Request extends BaseTasksRequest { + + private final Set datafeedTaskIds; + + public Request(List datafeedIds) { + this.datafeedTaskIds = datafeedIds.stream().map(MlTasks::datafeedTaskId).collect(Collectors.toSet()); + } + + public Request(StreamInput in) throws IOException { + super(in); + this.datafeedTaskIds = in.readSet(StreamInput::readString); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + super.writeTo(out); + out.writeStringCollection(datafeedTaskIds); + } + + public Set getDatafeedTaskIds() { + return datafeedTaskIds; + } + + @Override + public boolean match(Task task) { + return task instanceof StartDatafeedAction.DatafeedTaskMatcher && datafeedTaskIds.contains(task.getDescription()); + } + } + + public static class Response extends BaseTasksResponse { + + public static class RunningState implements Writeable, ToXContentObject { + + // Is the datafeed a "realtime" datafeed, meaning it was started without an end_time + private final boolean realTimeConfigured; + // Has the look back finished and are we now running on "real-time" data + private final boolean realTimeRunning; + + public RunningState(boolean realTimeConfigured, boolean realTimeRunning) { + this.realTimeConfigured = realTimeConfigured; + this.realTimeRunning = realTimeRunning; + } + + public RunningState(StreamInput in) throws IOException { + this.realTimeConfigured = in.readBoolean(); + this.realTimeRunning = in.readBoolean(); + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + RunningState that = (RunningState) o; + return realTimeConfigured == that.realTimeConfigured && realTimeRunning == that.realTimeRunning; + } + + @Override + public int hashCode() { + return Objects.hash(realTimeConfigured, realTimeRunning); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + out.writeBoolean(realTimeConfigured); + out.writeBoolean(realTimeRunning); + } + + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + builder.startObject(); + builder.field("real_time_configured", realTimeConfigured); + builder.field("real_time_running", realTimeRunning); + builder.endObject(); + return builder; + } + } + + private final Map datafeedRunningState; + + public static Response fromResponses(List responses) { + return new Response(responses.stream() + .flatMap(r -> r.datafeedRunningState.entrySet().stream()) + .filter(entry -> entry.getValue() != null) + .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue))); + } + + public static Response fromTaskAndState(String datafeedId, RunningState runningState) { + return new Response(MapBuilder.newMapBuilder().put(datafeedId, runningState).map()); + } + + public Response(StreamInput in) throws IOException { + super(in); + datafeedRunningState = in.readMap(StreamInput::readString, RunningState::new); + } + + public Response(Map runtimeStateMap) { + super(null, null); + this.datafeedRunningState = runtimeStateMap; + } + + public Optional getRunningState(String datafeedId) { + return Optional.ofNullable(datafeedRunningState.get(datafeedId)); + } + + public Map getDatafeedRunningState() { + return datafeedRunningState; + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + super.writeTo(out); + out.writeMap(datafeedRunningState, StreamOutput::writeString, (o, w) -> w.writeTo(o)); + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + Response response = (Response) o; + return Objects.equals(this.datafeedRunningState, response.datafeedRunningState); + } + + @Override + public int hashCode() { + return Objects.hash(datafeedRunningState); + } + } + +} diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/action/GetDatafeedsStatsAction.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/action/GetDatafeedsStatsAction.java index 1094a245e4d07..ec59bf29007c9 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/action/GetDatafeedsStatsAction.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/action/GetDatafeedsStatsAction.java @@ -6,9 +6,11 @@ */ package org.elasticsearch.xpack.core.ml.action; +import org.elasticsearch.Version; import org.elasticsearch.action.ActionRequestValidationException; import org.elasticsearch.action.ActionType; import org.elasticsearch.action.support.master.MasterNodeReadRequest; +import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.core.Nullable; import org.elasticsearch.common.io.stream.StreamInput; @@ -16,8 +18,11 @@ import org.elasticsearch.common.io.stream.Writeable; import org.elasticsearch.common.xcontent.ToXContentObject; import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.persistent.PersistentTasksCustomMetadata; import org.elasticsearch.xpack.core.action.AbstractGetResourcesResponse; import org.elasticsearch.xpack.core.action.util.QueryPage; +import org.elasticsearch.xpack.core.ml.MlTasks; +import org.elasticsearch.xpack.core.ml.action.GetDatafeedRunningStateAction.Response.RunningState; import org.elasticsearch.xpack.core.ml.datafeed.DatafeedConfig; import org.elasticsearch.xpack.core.ml.datafeed.DatafeedState; import org.elasticsearch.xpack.core.ml.datafeed.DatafeedTimingStats; @@ -25,9 +30,12 @@ import org.elasticsearch.xpack.core.ml.utils.ToXContentParams; import java.io.IOException; +import java.util.Collection; import java.util.Collections; +import java.util.List; import java.util.Map; import java.util.Objects; +import java.util.stream.Collectors; public class GetDatafeedsStatsAction extends ActionType { @@ -39,6 +47,7 @@ public class GetDatafeedsStatsAction extends ActionType getResponse() { protected Reader getReader() { return DatafeedStats::new; } + + public static class Builder { + private List statsBuilders; + private Map datafeedToJobId; + private Map timingStatsMap; + private GetDatafeedRunningStateAction.Response datafeedRuntimeState; + + public Builder setDatafeedIds(Collection datafeedIds) { + this.statsBuilders = datafeedIds.stream() + .map(GetDatafeedsStatsAction.Response.DatafeedStats::builder) + .collect(Collectors.toList()); + return this; + } + + public List getDatafeedIds() { + return statsBuilders.stream().map(DatafeedStats.Builder::getDatafeedId).collect(Collectors.toList()); + } + + public Builder setDatafeedToJobId(Map datafeedToJobId) { + this.datafeedToJobId = datafeedToJobId; + return this; + } + + public Builder setTimingStatsMap(Map timingStatsMap) { + this.timingStatsMap = timingStatsMap; + return this; + } + + public Builder setDatafeedRuntimeState(GetDatafeedRunningStateAction.Response datafeedRuntimeState) { + this.datafeedRuntimeState = datafeedRuntimeState; + return this; + } + + public Response build(PersistentTasksCustomMetadata tasksInProgress, ClusterState state) { + List stats = statsBuilders.stream().map(statsBuilder-> { + final String jobId = datafeedToJobId.get(statsBuilder.datafeedId); + DatafeedTimingStats timingStats = jobId == null ? + null : + timingStatsMap.getOrDefault(jobId, new DatafeedTimingStats(jobId)); + PersistentTasksCustomMetadata.PersistentTask maybeTask = MlTasks.getDatafeedTask( + statsBuilder.datafeedId, + tasksInProgress + ); + DatafeedState datafeedState = MlTasks.getDatafeedState(statsBuilder.datafeedId, tasksInProgress); + return statsBuilder.setNode(maybeTask != null ? state.getNodes().get(maybeTask.getExecutorNode()) : null) + .setDatafeedState(datafeedState) + .setAssignmentExplanation(maybeTask != null ? maybeTask.getAssignment().getExplanation() : null) + .setTimingStats(timingStats) + .setRunningState(datafeedRuntimeState.getRunningState(statsBuilder.datafeedId).orElse(null)) + .build(); + }).collect(Collectors.toList()); + return new Response(new QueryPage<>(stats, stats.size(), DatafeedConfig.RESULTS_FIELD)); + } + } } } diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/action/GetDatafeedRunningStateActionResponseTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/action/GetDatafeedRunningStateActionResponseTests.java new file mode 100644 index 0000000000000..8146db9fd350d --- /dev/null +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/action/GetDatafeedRunningStateActionResponseTests.java @@ -0,0 +1,36 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ +package org.elasticsearch.xpack.core.ml.action; + +import org.elasticsearch.common.io.stream.Writeable; +import org.elasticsearch.test.AbstractWireSerializingTestCase; +import org.elasticsearch.xpack.core.ml.action.GetDatafeedRunningStateAction.Response; + +import java.util.function.Function; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +public class GetDatafeedRunningStateActionResponseTests extends AbstractWireSerializingTestCase { + + static Response.RunningState randomRunningState() { + return new Response.RunningState(randomBoolean(), randomBoolean()); + } + + @Override + protected Response createTestInstance() { + int listSize = randomInt(10); + return new Response(Stream.generate(() -> randomAlphaOfLength(10)) + .limit(listSize) + .collect(Collectors.toMap(Function.identity(), _unused -> randomRunningState()))); + } + + @Override + protected Writeable.Reader instanceReader() { + return Response::new; + } + +} diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/action/GetDatafeedStatsActionResponseTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/action/GetDatafeedStatsActionResponseTests.java index 74430fe595798..8f3053307274a 100644 --- a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/action/GetDatafeedStatsActionResponseTests.java +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/action/GetDatafeedStatsActionResponseTests.java @@ -32,6 +32,7 @@ import java.util.Map; import java.util.Set; +import static org.elasticsearch.xpack.core.ml.action.GetDatafeedRunningStateActionResponseTests.randomRunningState; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.hasEntry; import static org.hamcrest.Matchers.hasKey; @@ -54,7 +55,14 @@ protected Response createTestInstance() { : new DiscoveryNode("_id", new TransportAddress(InetAddress.getLoopbackAddress(), 9300), Version.CURRENT); String explanation = randomBoolean() ? null : randomAlphaOfLength(3); DatafeedTimingStats timingStats = randomBoolean() ? null : DatafeedTimingStatsTests.createRandom(); - Response.DatafeedStats datafeedStats = new Response.DatafeedStats(datafeedId, datafeedState, node, explanation, timingStats); + Response.DatafeedStats datafeedStats = new Response.DatafeedStats( + datafeedId, + datafeedState, + node, + explanation, + timingStats, + randomBoolean() ? null : randomRunningState() + ); datafeedStatsList.add(datafeedStats); } @@ -83,7 +91,14 @@ public void testDatafeedStatsToXContent() throws IOException { DatafeedTimingStats timingStats = new DatafeedTimingStats("my-job-id", 5, 10, 100.0, new ExponentialAverageCalculationContext(50.0, null, null)); - Response.DatafeedStats stats = new Response.DatafeedStats("df-id", DatafeedState.STARTED, node, null, timingStats); + Response.DatafeedStats stats = new Response.DatafeedStats( + "df-id", + DatafeedState.STARTED, + node, + null, + timingStats, + randomRunningState() + ); XContentType xContentType = randomFrom(XContentType.values()); BytesReference bytes; @@ -94,11 +109,16 @@ public void testDatafeedStatsToXContent() throws IOException { Map dfStatsMap = XContentHelper.convertToMap(bytes, randomBoolean(), xContentType).v2(); - assertThat(dfStatsMap.size(), is(equalTo(4))); + assertThat(dfStatsMap.size(), is(equalTo(5))); assertThat(dfStatsMap, hasEntry("datafeed_id", "df-id")); assertThat(dfStatsMap, hasEntry("state", "started")); assertThat(dfStatsMap, hasKey("node")); assertThat(dfStatsMap, hasKey("timing_stats")); + assertThat(dfStatsMap, hasKey("running_state")); + + Map runningStateMap = (Map) dfStatsMap.get("running_state"); + assertThat(runningStateMap, hasKey("real_time_configured")); + assertThat(runningStateMap, hasKey("real_time_running")); Map nodeMap = (Map) dfStatsMap.get("node"); assertThat(nodeMap, hasEntry("id", "df-node-id")); diff --git a/x-pack/plugin/ml/qa/native-multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/ml/integration/DatafeedJobsRestIT.java b/x-pack/plugin/ml/qa/native-multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/ml/integration/DatafeedJobsRestIT.java index 551e826749deb..89d8ec27b8b0e 100644 --- a/x-pack/plugin/ml/qa/native-multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/ml/integration/DatafeedJobsRestIT.java +++ b/x-pack/plugin/ml/qa/native-multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/ml/integration/DatafeedJobsRestIT.java @@ -14,9 +14,9 @@ import org.elasticsearch.client.RestClient; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.util.concurrent.ThreadContext; +import org.elasticsearch.core.TimeValue; import org.elasticsearch.test.SecuritySettingsSourceField; import org.elasticsearch.test.rest.ESRestTestCase; -import org.elasticsearch.xpack.core.ml.MlTasks; import org.elasticsearch.xpack.core.ml.integration.MlRestTestStateCleaner; import org.elasticsearch.xpack.core.ml.notifications.NotificationsIndex; import org.elasticsearch.xpack.core.rollup.job.RollupJob; @@ -34,6 +34,7 @@ import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; +import static org.hamcrest.Matchers.anyOf; import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.not; @@ -1018,13 +1019,29 @@ public void testRealtime() throws Exception { String jobId = "job-realtime-1"; createJob(jobId, "airline"); String datafeedId = jobId + "-datafeed"; - new DatafeedBuilder(datafeedId, jobId, "airline-data").build(); + new DatafeedBuilder(datafeedId, jobId, "airline-data").setFrequency(TimeValue.timeValueSeconds(5)).build(); openJob(client(), jobId); Request startRequest = new Request("POST", MachineLearning.BASE_PATH + "datafeeds/" + datafeedId + "/_start"); startRequest.addParameter("start", "2016-06-01T00:00:00Z"); Response response = client().performRequest(startRequest); assertThat(EntityUtils.toString(response.getEntity()), containsString("\"started\":true")); + + // We should now be running in real time but may or may not have finished look back + assertBusy(() -> { + try { + Response datafeedStatsResponse = client().performRequest(new Request("GET", + MachineLearning.BASE_PATH + "datafeeds/" + datafeedId + "/_stats")); + String body = EntityUtils.toString(datafeedStatsResponse.getEntity()); + assertThat(body, containsString("\"real_time_configured\":true")); + assertThat(body, anyOf( + containsString("\"real_time_running\":true"), + containsString("\"real_time_running\":false") + )); + } catch (Exception e1) { + throw new RuntimeException(e1); + } + }); assertBusy(() -> { try { Response getJobResponse = client().performRequest(new Request("GET", @@ -1057,6 +1074,19 @@ public void testRealtime() throws Exception { assertThat(EntityUtils.toString(response.getEntity()), containsString("Cannot delete job [" + jobId + "] because the job is opened")); + // Look back should now be completed and we are still considered a real time datafeed (no endtime set) + assertBusy(() -> { + try { + Response datafeedStatsResponse = client().performRequest(new Request("GET", + MachineLearning.BASE_PATH + "datafeeds/" + datafeedId + "/_stats")); + String body = EntityUtils.toString(datafeedStatsResponse.getEntity()); + assertThat(body, containsString("\"real_time_configured\":true")); + assertThat(body, containsString("\"real_time_running\":true")); + } catch (Exception e1) { + throw new RuntimeException(e1); + } + }); + response = client().performRequest(new Request("POST", MachineLearning.BASE_PATH + "datafeeds/" + datafeedId + "/_stop")); assertThat(response.getStatusLine().getStatusCode(), equalTo(200)); assertThat(EntityUtils.toString(response.getEntity()), equalTo("{\"stopped\":true}")); @@ -1246,6 +1276,7 @@ private static class DatafeedBuilder { String secondaryAuthHeader = null; String chunkingTimespan; String indicesOptions; + TimeValue frequency; DatafeedBuilder(String datafeedId, String jobId, String index) { this.datafeedId = datafeedId; @@ -1253,8 +1284,8 @@ private static class DatafeedBuilder { this.index = index; } - DatafeedBuilder setSource(boolean enableSource) { - this.source = enableSource; + public DatafeedBuilder setFrequency(TimeValue frequency) { + this.frequency = frequency; return this; } @@ -1295,6 +1326,7 @@ Response build() throws IOException { + (source ? ",\"_source\":true" : "") + (scriptedFields == null ? "" : ",\"script_fields\":" + scriptedFields) + (aggregations == null ? "" : ",\"aggs\":" + aggregations) + + (frequency == null ? "" : ",\"frequency\":\"" + frequency + "\"") + (indicesOptions == null ? "" : ",\"indices_options\":" + indicesOptions) + (chunkingTimespan == null ? "" : ",\"chunking_config\":{\"mode\":\"MANUAL\",\"time_span\":\"" + chunkingTimespan + "\"}") diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java index 9e24ec2b441a3..30758c8b1c8f3 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java @@ -98,6 +98,7 @@ import org.elasticsearch.xpack.core.ml.action.DeleteModelSnapshotAction; import org.elasticsearch.xpack.core.ml.action.DeleteTrainedModelAction; import org.elasticsearch.xpack.core.ml.action.DeleteTrainedModelAliasAction; +import org.elasticsearch.xpack.core.ml.action.GetDatafeedRunningStateAction; import org.elasticsearch.xpack.core.ml.action.InferTrainedModelDeploymentAction; import org.elasticsearch.xpack.core.ml.action.StartTrainedModelDeploymentAction; import org.elasticsearch.xpack.core.ml.action.EstimateModelMemoryAction; @@ -184,6 +185,7 @@ import org.elasticsearch.xpack.ml.action.TransportDeleteModelSnapshotAction; import org.elasticsearch.xpack.ml.action.TransportDeleteTrainedModelAction; import org.elasticsearch.xpack.ml.action.TransportDeleteTrainedModelAliasAction; +import org.elasticsearch.xpack.ml.action.TransportGetDatafeedRunningStateAction; import org.elasticsearch.xpack.ml.action.TransportInferTrainedModelDeploymentAction; import org.elasticsearch.xpack.ml.action.TransportStartTrainedModelDeploymentAction; import org.elasticsearch.xpack.ml.action.TransportEstimateModelMemoryAction; @@ -1068,6 +1070,7 @@ public List getRestHandlers(Settings settings, RestController restC new ActionHandler<>(StartTrainedModelDeploymentAction.INSTANCE, TransportStartTrainedModelDeploymentAction.class), new ActionHandler<>(StopTrainedModelDeploymentAction.INSTANCE, TransportStopTrainedModelDeploymentAction.class), new ActionHandler<>(InferTrainedModelDeploymentAction.INSTANCE, TransportInferTrainedModelDeploymentAction.class), + new ActionHandler<>(GetDatafeedRunningStateAction.INSTANCE, TransportGetDatafeedRunningStateAction.class), usageAction, infoAction); } diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportGetDatafeedRunningStateAction.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportGetDatafeedRunningStateAction.java new file mode 100644 index 0000000000000..352c2e73a20fc --- /dev/null +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportGetDatafeedRunningStateAction.java @@ -0,0 +1,144 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ +package org.elasticsearch.xpack.ml.action; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.apache.logging.log4j.message.ParameterizedMessage; +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.FailedNodeException; +import org.elasticsearch.action.TaskOperationFailure; +import org.elasticsearch.action.support.ActionFilters; +import org.elasticsearch.action.support.tasks.TransportTasksAction; +import org.elasticsearch.cluster.node.DiscoveryNodes; +import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.inject.Inject; +import org.elasticsearch.persistent.PersistentTasksCustomMetadata; +import org.elasticsearch.tasks.Task; +import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.transport.TransportService; +import org.elasticsearch.xpack.core.ml.action.GetDatafeedRunningStateAction; +import org.elasticsearch.xpack.core.ml.action.GetDatafeedRunningStateAction.Request; +import org.elasticsearch.xpack.core.ml.action.GetDatafeedRunningStateAction.Response; +import org.elasticsearch.xpack.core.ml.action.StartDatafeedAction; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.stream.Collectors; + +public class TransportGetDatafeedRunningStateAction extends TransportTasksAction< + TransportStartDatafeedAction.DatafeedTask, + Request, + Response, + Response> { + + private static final Logger logger = LogManager.getLogger(TransportGetDatafeedRunningStateAction.class); + + @Inject + public TransportGetDatafeedRunningStateAction( + TransportService transportService, + ClusterService clusterService, + ActionFilters actionFilters + ) { + super( + GetDatafeedRunningStateAction.NAME, + clusterService, + transportService, + actionFilters, + Request::new, + Response::new, + Response::new, + ThreadPool.Names.MANAGEMENT + ); + } + + @Override + protected Response newResponse(Request request, + List tasks, + List taskOperationFailures, + List failedNodeExceptions) { + org.elasticsearch.ExceptionsHelper.rethrowAndSuppress( + taskOperationFailures.stream() + .map(t -> org.elasticsearch.ExceptionsHelper.convertToElastic(t.getCause())) + .collect(Collectors.toList()) + ); + org.elasticsearch.ExceptionsHelper.rethrowAndSuppress(failedNodeExceptions); + return Response.fromResponses(tasks); + } + + @Override + protected void taskOperation( + Request request, + TransportStartDatafeedAction.DatafeedTask datafeedTask, + ActionListener listener + ) { + listener.onResponse(Response.fromTaskAndState(datafeedTask.getDatafeedId(), datafeedTask.getRunningState().orElse(null))); + } + + @Override + protected void doExecute(Task task, Request request, ActionListener listener) { + DiscoveryNodes nodes = clusterService.state().nodes(); + PersistentTasksCustomMetadata tasks = clusterService.state().getMetadata().custom(PersistentTasksCustomMetadata.TYPE); + if (tasks == null) { + listener.onResponse(new Response(Collections.emptyMap())); + return; + } + final List> datafeedTasks = request.getDatafeedTaskIds() + .stream() + .map(tasks::getTask) + .filter(Objects::nonNull) + .collect(Collectors.toList()); + if (datafeedTasks.isEmpty()) { + listener.onResponse(new Response(Collections.emptyMap())); + return; + } + + // Do this to catch datafeed tasks that have been created but are currently not assigned to a node. + ActionListener taskResponseListener = ActionListener.wrap( + actionResponses -> { + Map runningStateMap = actionResponses.getDatafeedRunningState(); + if (runningStateMap.size() == datafeedTasks.size()) { + listener.onResponse(actionResponses); + return; + } + List missingResponses = new ArrayList<>(); + missingResponses.add(actionResponses); + missingResponses.add(new Response(datafeedTasks.stream() + .map(t -> (StartDatafeedAction.DatafeedParams)t.getParams()) + .filter(datafeedParams -> runningStateMap.containsKey(datafeedParams.getDatafeedId()) == false) + .collect(Collectors.toMap( + StartDatafeedAction.DatafeedParams::getDatafeedId, + // If it isn't assigned to a node, assume that look back hasn't completed yet + params -> new Response.RunningState(params.getEndTime() == null, false) + )))); + listener.onResponse(Response.fromResponses(missingResponses)); + }, + listener::onFailure + ); + + String[] nodesOfConcern = datafeedTasks.stream().map(PersistentTasksCustomMetadata.PersistentTask::getExecutorNode) + .filter(Objects::nonNull) + .filter(nodes::nodeExists) + .toArray(String[]::new); + + if (nodesOfConcern.length == 0) { + logger.debug(() -> new ParameterizedMessage( + "Unable to find executor nodes for datafeed tasks {}", + request.getDatafeedTaskIds() + )); + + taskResponseListener.onResponse(new Response(Collections.emptyMap())); + return; + } + request.setNodes(nodesOfConcern); + super.doExecute(task, request, taskResponseListener); + } + +} diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportGetDatafeedsStatsAction.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportGetDatafeedsStatsAction.java index 2acde3bdfe3d1..e6b15091e3bda 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportGetDatafeedsStatsAction.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportGetDatafeedsStatsAction.java @@ -8,108 +8,124 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; +import org.apache.logging.log4j.message.ParameterizedMessage; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.support.ActionFilters; import org.elasticsearch.action.support.master.TransportMasterNodeReadAction; +import org.elasticsearch.client.Client; +import org.elasticsearch.client.OriginSettingClient; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.block.ClusterBlockException; import org.elasticsearch.cluster.block.ClusterBlockLevel; import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; -import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.persistent.PersistentTasksCustomMetadata; import org.elasticsearch.tasks.Task; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportService; -import org.elasticsearch.xpack.core.action.util.QueryPage; -import org.elasticsearch.xpack.core.ml.MlTasks; +import org.elasticsearch.xpack.core.ml.action.GetDatafeedRunningStateAction; import org.elasticsearch.xpack.core.ml.action.GetDatafeedsStatsAction; +import org.elasticsearch.xpack.core.ml.action.GetDatafeedsStatsAction.Response; +import org.elasticsearch.xpack.core.ml.action.GetDatafeedsStatsAction.Request; import org.elasticsearch.xpack.core.ml.datafeed.DatafeedConfig; -import org.elasticsearch.xpack.core.ml.datafeed.DatafeedState; import org.elasticsearch.xpack.core.ml.datafeed.DatafeedTimingStats; import org.elasticsearch.xpack.ml.datafeed.persistence.DatafeedConfigProvider; import org.elasticsearch.xpack.ml.job.persistence.JobResultsProvider; +import java.util.ArrayList; import java.util.List; import java.util.Map; import java.util.SortedSet; -import java.util.function.Function; import java.util.stream.Collectors; -public class TransportGetDatafeedsStatsAction extends TransportMasterNodeReadAction { +import static org.elasticsearch.xpack.core.ClientHelper.ML_ORIGIN; + +public class TransportGetDatafeedsStatsAction extends TransportMasterNodeReadAction { private static final Logger logger = LogManager.getLogger(TransportGetDatafeedsStatsAction.class); private final DatafeedConfigProvider datafeedConfigProvider; private final JobResultsProvider jobResultsProvider; + private final OriginSettingClient client; @Inject public TransportGetDatafeedsStatsAction(TransportService transportService, ClusterService clusterService, ThreadPool threadPool, ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver, - DatafeedConfigProvider datafeedConfigProvider, JobResultsProvider jobResultsProvider) { - super(GetDatafeedsStatsAction.NAME, transportService, clusterService, threadPool, actionFilters, - GetDatafeedsStatsAction.Request::new, indexNameExpressionResolver, GetDatafeedsStatsAction.Response::new, - ThreadPool.Names.SAME); + DatafeedConfigProvider datafeedConfigProvider, JobResultsProvider jobResultsProvider, + Client client) { + super( + GetDatafeedsStatsAction.NAME, + transportService, + clusterService, + threadPool, + actionFilters, + Request::new, + indexNameExpressionResolver, + Response::new, + ThreadPool.Names.SAME + ); this.datafeedConfigProvider = datafeedConfigProvider; this.jobResultsProvider = jobResultsProvider; + this.client = new OriginSettingClient(client, ML_ORIGIN); } @Override - protected void masterOperation(Task task, GetDatafeedsStatsAction.Request request, - ClusterState state, - ActionListener listener) { - logger.debug("Get stats for datafeed '{}'", request.getDatafeedId()); + protected void masterOperation(Task task, Request request, ClusterState state, ActionListener listener) { + logger.debug(() -> new ParameterizedMessage("[{}] get stats for datafeed", request.getDatafeedId())); final PersistentTasksCustomMetadata tasksInProgress = state.getMetadata().custom(PersistentTasksCustomMetadata.TYPE); + final Response.Builder responseBuilder = new Response.Builder(); + + // 5. Build response + ActionListener runtimeStateListener = ActionListener.wrap( + runtimeStateResponse -> { + responseBuilder.setDatafeedRuntimeState(runtimeStateResponse); + listener.onResponse(responseBuilder.build(tasksInProgress, state)); + }, + listener::onFailure + ); + + // 4. Grab runtime state + ActionListener> datafeedTimingStatsListener = ActionListener.wrap( + timingStatsByJobId -> { + responseBuilder.setTimingStatsMap(timingStatsByJobId); + client.execute( + GetDatafeedRunningStateAction.INSTANCE, + new GetDatafeedRunningStateAction.Request(responseBuilder.getDatafeedIds()), + runtimeStateListener + ); + }, + listener::onFailure + ); + + // 3. Grab timing stats + ActionListener> expandedConfigsListener = ActionListener.wrap( + datafeedBuilders -> { + Map datafeedIdsToJobIds = datafeedBuilders.stream() + .collect(Collectors.toMap(DatafeedConfig.Builder::getId, DatafeedConfig.Builder::getJobId)); + responseBuilder.setDatafeedToJobId(datafeedIdsToJobIds); + jobResultsProvider.datafeedTimingStats(new ArrayList<>(datafeedIdsToJobIds.values()), datafeedTimingStatsListener); + }, + listener::onFailure + ); + + // 2. Now that we have the ids, grab the datafeed configs ActionListener> expandIdsListener = ActionListener.wrap( expandedIds -> { + responseBuilder.setDatafeedIds(expandedIds); datafeedConfigProvider.expandDatafeedConfigs( request.getDatafeedId(), // Already took into account the request parameter when we expanded the IDs with the tasks earlier // Should allow for no datafeeds in case the config is gone true, - ActionListener.wrap( - datafeedBuilders -> { - Map existingConfigs = datafeedBuilders.stream() - .map(DatafeedConfig.Builder::build) - .collect(Collectors.toMap(DatafeedConfig::getId, Function.identity())); - - List jobIds = existingConfigs.values() - .stream() - .map(DatafeedConfig::getJobId) - .collect(Collectors.toList()); - jobResultsProvider.datafeedTimingStats( - jobIds, - ActionListener.wrap(timingStatsByJobId -> { - List results = expandedIds.stream() - .map(datafeedId -> { - DatafeedConfig config = existingConfigs.get(datafeedId); - String jobId = config == null ? null : config.getJobId(); - DatafeedTimingStats timingStats = jobId == null ? null : timingStatsByJobId.get(jobId); - return buildDatafeedStats( - datafeedId, - state, - tasksInProgress, - jobId, - timingStats - ); - }) - .collect(Collectors.toList()); - QueryPage statsPage = - new QueryPage<>(results, results.size(), DatafeedConfig.RESULTS_FIELD); - listener.onResponse(new GetDatafeedsStatsAction.Response(statsPage)); - }, - listener::onFailure)); - }, - listener::onFailure) + expandedConfigsListener ); }, listener::onFailure ); - // This might also include datafeed tasks that exist but no longer have a config + // 1. This might also include datafeed tasks that exist but no longer have a config datafeedConfigProvider.expandDatafeedIds(request.getDatafeedId(), request.allowNoMatch(), tasksInProgress, @@ -117,27 +133,8 @@ protected void masterOperation(Task task, GetDatafeedsStatsAction.Request reques expandIdsListener); } - private static GetDatafeedsStatsAction.Response.DatafeedStats buildDatafeedStats(String datafeedId, - ClusterState state, - PersistentTasksCustomMetadata tasks, - String jobId, - DatafeedTimingStats timingStats) { - PersistentTasksCustomMetadata.PersistentTask task = MlTasks.getDatafeedTask(datafeedId, tasks); - DatafeedState datafeedState = MlTasks.getDatafeedState(datafeedId, tasks); - DiscoveryNode node = null; - String explanation = null; - if (task != null) { - node = state.nodes().get(task.getExecutorNode()); - explanation = task.getAssignment().getExplanation(); - } - if (timingStats == null && jobId != null) { - timingStats = new DatafeedTimingStats(jobId); - } - return new GetDatafeedsStatsAction.Response.DatafeedStats(datafeedId, datafeedState, node, explanation, timingStats); - } - @Override - protected ClusterBlockException checkBlock(GetDatafeedsStatsAction.Request request, ClusterState state) { + protected ClusterBlockException checkBlock(Request request, ClusterState state) { return state.blocks().globalBlockedException(ClusterBlockLevel.METADATA_READ); } } diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportStartDatafeedAction.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportStartDatafeedAction.java index d3c71d495a0b9..00ada0aef5bc7 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportStartDatafeedAction.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportStartDatafeedAction.java @@ -45,6 +45,7 @@ import org.elasticsearch.transport.TransportService; import org.elasticsearch.xpack.core.XPackField; import org.elasticsearch.xpack.core.ml.MlTasks; +import org.elasticsearch.xpack.core.ml.action.GetDatafeedRunningStateAction; import org.elasticsearch.xpack.core.ml.action.NodeAcknowledgedResponse; import org.elasticsearch.xpack.core.ml.action.StartDatafeedAction; import org.elasticsearch.xpack.core.ml.datafeed.DatafeedConfig; @@ -539,6 +540,16 @@ public void isolate() { datafeedManager.isolateDatafeed(getAllocationId()); } } + + public Optional getRunningState() { + if (datafeedManager == null) { + return Optional.empty(); + } + return Optional.of(new GetDatafeedRunningStateAction.Response.RunningState( + this.endTime == null, + datafeedManager.finishedLookBack(this) + )); + } } /** diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedManager.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedManager.java index 7cb3dec4bb543..676d4794b5242 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedManager.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedManager.java @@ -163,6 +163,11 @@ public void isolateDatafeed(long allocationId) { } } + public boolean finishedLookBack(TransportStartDatafeedAction.DatafeedTask task) { + Holder holder = runningDatafeedsOnThisNode.get(task.getAllocationId()); + return holder != null && holder.isLookbackFinished(); + } + // Important: Holder must be created and assigned to DatafeedTask before setting state to started, // otherwise if a stop datafeed call is made immediately after the start datafeed call we could cancel // the DatafeedTask without stopping datafeed, which causes the datafeed to keep on running. @@ -210,6 +215,7 @@ protected void doRun() { holder.stop("general_lookback_failure", TimeValue.timeValueSeconds(20), e); return; } + holder.finishedLookback(true); if (holder.isIsolated() == false) { if (next != null) { doDatafeedRealtime(next, holder.datafeedJob.getJobId(), holder); @@ -327,6 +333,7 @@ public class Holder { private final Consumer finishHandler; volatile Scheduler.Cancellable cancellable; private volatile boolean isNodeShuttingDown; + private volatile boolean lookbackFinished; Holder(TransportStartDatafeedAction.DatafeedTask task, String datafeedId, DatafeedJob datafeedJob, ProblemTracker problemTracker, Consumer finishHandler) { @@ -344,6 +351,10 @@ boolean shouldStopAfterEmptyData(int emptyDataCount) { return emptyDataCountToStopAt != null && emptyDataCount >= emptyDataCountToStopAt; } + private void finishedLookback(boolean value) { + this.lookbackFinished = value; + } + String getJobId() { return datafeedJob.getJobId(); } @@ -414,8 +425,13 @@ public void setNodeIsShuttingDown() { isNodeShuttingDown = true; } + public boolean isLookbackFinished() { + return lookbackFinished; + } + private Long executeLookBack(long startTime, Long endTime) throws Exception { datafeedJobLock.lock(); + lookbackFinished = false; try { if (isRunning() && isIsolated() == false) { return datafeedJob.runLookBack(startTime, endTime); diff --git a/x-pack/plugin/security/qa/operator-privileges-tests/src/javaRestTest/java/org/elasticsearch/xpack/security/operator/Constants.java b/x-pack/plugin/security/qa/operator-privileges-tests/src/javaRestTest/java/org/elasticsearch/xpack/security/operator/Constants.java index 89cd0b41dc274..6e8d807611edb 100644 --- a/x-pack/plugin/security/qa/operator-privileges-tests/src/javaRestTest/java/org/elasticsearch/xpack/security/operator/Constants.java +++ b/x-pack/plugin/security/qa/operator-privileges-tests/src/javaRestTest/java/org/elasticsearch/xpack/security/operator/Constants.java @@ -218,6 +218,7 @@ public class Constants { "cluster:admin/xpack/watcher/watch/execute", "cluster:admin/xpack/watcher/watch/put", "cluster:internal/xpack/ml/datafeed/isolate", + "cluster:internal/xpack/ml/datafeed/running_state", "cluster:internal/xpack/ml/inference/infer", "cluster:internal/xpack/ml/job/finalize_job_execution", "cluster:internal/xpack/ml/job/kill/process", diff --git a/x-pack/plugin/src/yamlRestTest/resources/rest-api-spec/test/ml/get_datafeed_stats.yml b/x-pack/plugin/src/yamlRestTest/resources/rest-api-spec/test/ml/get_datafeed_stats.yml index cbd387ae14f87..20a13a990cedf 100644 --- a/x-pack/plugin/src/yamlRestTest/resources/rest-api-spec/test/ml/get_datafeed_stats.yml +++ b/x-pack/plugin/src/yamlRestTest/resources/rest-api-spec/test/ml/get_datafeed_stats.yml @@ -152,6 +152,7 @@ setup: - match: { datafeeds.0.datafeed_id: "datafeed-1"} - match: { datafeeds.0.state: "stopped"} - is_false: datafeeds.0.node + - is_false: datafeeds.0.running_state - match: { datafeeds.0.timing_stats.job_id: "get-datafeed-stats-1" } - match: { datafeeds.0.timing_stats.search_count: 0 } - match: { datafeeds.0.timing_stats.total_search_time_ms: 0.0} @@ -162,6 +163,7 @@ setup: - match: { datafeeds.0.datafeed_id: "datafeed-2"} - match: { datafeeds.0.state: "stopped"} - is_false: datafeeds.0.node + - is_false: datafeeds.0.running_state - match: { datafeeds.0.timing_stats.job_id: "get-datafeed-stats-2" } - match: { datafeeds.0.timing_stats.search_count: 0 } - match: { datafeeds.0.timing_stats.total_search_time_ms: 0.0} @@ -185,6 +187,8 @@ setup: - match: { datafeeds.0.state: "started"} - is_true: datafeeds.0.node.name - is_true: datafeeds.0.node.transport_address + - is_true: datafeeds.0.running_state + - is_true: datafeeds.0.running_state.real_time_configured --- "Test get stats for started datafeed contains timing stats":