Skip to content

[ML] adding running_state to datafeed stats object #73926

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
Jun 10, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ Retrieves usage information for {dfeeds}.
[[ml-get-datafeed-stats-prereqs]]
== {api-prereq-title}

Requires the `monitor_ml` cluster privilege. This privilege is included in the
Requires the `monitor_ml` cluster privilege. This privilege is included in the
`machine_learning_user` built-in role.

[[ml-get-datafeed-stats-desc]]
Expand Down Expand Up @@ -103,6 +103,24 @@ include::{es-repo-dir}/ml/ml-shared.asciidoc[tag=node-transport-address]
====
--

`running_state`::
(object) An object containing the running state for this {dfeed}. It is only
provided if the {dfeed} is started.
+
--
[%collapsible%open]
====
`real_time_configured`:::
(boolean) Indicates if the {dfeed} is "real-time"; meaning that the {dfeed}
has no configured `end` time.

`real_time_running`:::
(boolean) Indicates whether the {dfeed} has finished running on the available
past data. For {dfeeds} without a configured `end` time, this means that
the {dfeed} is now running on "real-time" data.
====
--

`state`::
(string)
include::{es-repo-dir}/ml/ml-shared.asciidoc[tag=state-datafeed]
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,170 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
package org.elasticsearch.xpack.core.ml.action;

import org.elasticsearch.action.ActionType;
import org.elasticsearch.action.support.tasks.BaseTasksRequest;
import org.elasticsearch.action.support.tasks.BaseTasksResponse;
import org.elasticsearch.common.collect.MapBuilder;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.xcontent.ToXContentObject;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.xpack.core.ml.MlTasks;

import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;


/**
* Internal only action to get the current running state of a datafeed
*/
public class GetDatafeedRunningStateAction extends ActionType<GetDatafeedRunningStateAction.Response> {

public static final GetDatafeedRunningStateAction INSTANCE = new GetDatafeedRunningStateAction();
public static final String NAME = "cluster:internal/xpack/ml/datafeed/running_state";

private GetDatafeedRunningStateAction() {
super(NAME, GetDatafeedRunningStateAction.Response::new);
}

public static class Request extends BaseTasksRequest<Request> {

private final Set<String> datafeedTaskIds;

public Request(List<String> datafeedIds) {
this.datafeedTaskIds = datafeedIds.stream().map(MlTasks::datafeedTaskId).collect(Collectors.toSet());
}

public Request(StreamInput in) throws IOException {
super(in);
this.datafeedTaskIds = in.readSet(StreamInput::readString);
}

@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeStringCollection(datafeedTaskIds);
}

public Set<String> getDatafeedTaskIds() {
return datafeedTaskIds;
}

@Override
public boolean match(Task task) {
return task instanceof StartDatafeedAction.DatafeedTaskMatcher && datafeedTaskIds.contains(task.getDescription());
}
}

public static class Response extends BaseTasksResponse {

public static class RunningState implements Writeable, ToXContentObject {

// Is the datafeed a "realtime" datafeed, meaning it was started without an end_time
private final boolean realTimeConfigured;
// Has the look back finished and are we now running on "real-time" data
private final boolean realTimeRunning;

public RunningState(boolean realTimeConfigured, boolean realTimeRunning) {
this.realTimeConfigured = realTimeConfigured;
this.realTimeRunning = realTimeRunning;
}

public RunningState(StreamInput in) throws IOException {
this.realTimeConfigured = in.readBoolean();
this.realTimeRunning = in.readBoolean();
}

@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
RunningState that = (RunningState) o;
return realTimeConfigured == that.realTimeConfigured && realTimeRunning == that.realTimeRunning;
}

@Override
public int hashCode() {
return Objects.hash(realTimeConfigured, realTimeRunning);
}

@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeBoolean(realTimeConfigured);
out.writeBoolean(realTimeRunning);
}

@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject();
builder.field("real_time_configured", realTimeConfigured);
builder.field("real_time_running", realTimeRunning);
builder.endObject();
return builder;
}
}

private final Map<String, RunningState> datafeedRunningState;

public static Response fromResponses(List<Response> responses) {
return new Response(responses.stream()
.flatMap(r -> r.datafeedRunningState.entrySet().stream())
.filter(entry -> entry.getValue() != null)
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)));
}

public static Response fromTaskAndState(String datafeedId, RunningState runningState) {
return new Response(MapBuilder.<String, RunningState>newMapBuilder().put(datafeedId, runningState).map());
}

public Response(StreamInput in) throws IOException {
super(in);
datafeedRunningState = in.readMap(StreamInput::readString, RunningState::new);
}

public Response(Map<String, RunningState> runtimeStateMap) {
super(null, null);
this.datafeedRunningState = runtimeStateMap;
}

public Optional<RunningState> getRunningState(String datafeedId) {
return Optional.ofNullable(datafeedRunningState.get(datafeedId));
}

public Map<String, RunningState> getDatafeedRunningState() {
return datafeedRunningState;
}

@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeMap(datafeedRunningState, StreamOutput::writeString, (o, w) -> w.writeTo(o));
}

@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
Response response = (Response) o;
return Objects.equals(this.datafeedRunningState, response.datafeedRunningState);
}

@Override
public int hashCode() {
return Objects.hash(datafeedRunningState);
}
}

}
Loading