Skip to content

[ML] adding running_state to datafeed stats object #73926

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
Jun 10, 2021
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ Retrieves usage information for {dfeeds}.
[[ml-get-datafeed-stats-prereqs]]
== {api-prereq-title}

Requires the `monitor_ml` cluster privilege. This privilege is included in the
Requires the `monitor_ml` cluster privilege. This privilege is included in the
`machine_learning_user` built-in role.

[[ml-get-datafeed-stats-desc]]
Expand Down Expand Up @@ -103,6 +103,24 @@ include::{es-repo-dir}/ml/ml-shared.asciidoc[tag=node-transport-address]
====
--

`running_state`::
(object) An object containing the running state for this {dfeed}. It is only
provided if the {dfeed} is started.
+
--
[%collapsible%open]
====
`is_real_time`:::
(boolean) Indicates if the {dfeed} is "real-time"; meaning that the {dfeed}
has no configured `end` time.

`finished_look_back`:::
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Our existing docs don't use the term "lookback". I am not sure we should introduce it now.

One solution would be to rename is_real_time to real_time_configured and finished_look_back to real_time_running. Then that doesn't involve introducing a new public term.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@droberts195

While its true that we don't use it in docs, lookback is used in audit messages:

    public static final String JOB_AUDIT_DATAFEED_LOOKBACK_COMPLETED = "Datafeed lookback completed";
    public static final String JOB_AUDIT_DATAFEED_LOOKBACK_NO_DATA = "Datafeed lookback retrieved no data";

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am happy to change to both to indicate real_time. But we do use lookback in user facing things.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lookback is used in audit messages

Oh, interesting. I think we made the decision in 2016 not to use it anywhere user-facing, but those audit messages were added in 2017, by which time the 2016 decision had been forgotten. I guess the lesson is that we should change our terminology in internal code as well as what's immediately user-facing to stop internal terminology leaking out later on.

I still think there is a benefit in not propagating the term to fields that will appear in every high level client's public API. At the moment we could change the wording of those audit messages without making a breaking change, but the REST responses are harder to change once published.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

okey dokey, I will rename the variables.

I guess the lesson is that we should change our terminology in internal code as well as what's immediately user-facing to stop internal terminology leaking out later on.

100%. Renaming things everywhere prevents this sort of thing.

(boolean) Has the {dfeed} finished running on the available past data. For {dfeeds}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
(boolean) Has the {dfeed} finished running on the available past data. For {dfeeds}
(boolean) Indicates whether the {dfeed} has finished running on the available past data. For {dfeeds}

that without a configured `end` time, this means that the {dfeed} is now running on
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
that without a configured `end` time, this means that the {dfeed} is now running on
without a configured `end` time, this means that the {dfeed} is now running on

"real-time" data.
====
--

`state`::
(string)
include::{es-repo-dir}/ml/ml-shared.asciidoc[tag=state-datafeed]
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,170 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
package org.elasticsearch.xpack.core.ml.action;

import org.elasticsearch.action.ActionType;
import org.elasticsearch.action.support.tasks.BaseTasksRequest;
import org.elasticsearch.action.support.tasks.BaseTasksResponse;
import org.elasticsearch.common.collect.MapBuilder;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.xcontent.ToXContentObject;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.xpack.core.ml.MlTasks;

import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;


/**
* Internal only action to get the current running state of a datafeed
*/
public class GetDatafeedRunningStateAction extends ActionType<GetDatafeedRunningStateAction.Response> {

public static final GetDatafeedRunningStateAction INSTANCE = new GetDatafeedRunningStateAction();
public static final String NAME = "cluster:internal/xpack/ml/datafeed/running_state";

private GetDatafeedRunningStateAction() {
super(NAME, GetDatafeedRunningStateAction.Response::new);
}

public static class Request extends BaseTasksRequest<Request> {

private final Set<String> datafeedTaskIds;

public Request(List<String> datafeedIds) {
this.datafeedTaskIds = datafeedIds.stream().map(MlTasks::datafeedTaskId).collect(Collectors.toSet());
}

public Request(StreamInput in) throws IOException {
super(in);
this.datafeedTaskIds = in.readSet(StreamInput::readString);
}

@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeStringCollection(datafeedTaskIds);
}

public Set<String> getDatafeedTaskIds() {
return datafeedTaskIds;
}

@Override
public boolean match(Task task) {
return task instanceof StartDatafeedAction.DatafeedTaskMatcher && datafeedTaskIds.contains(task.getDescription());
}
}

public static class Response extends BaseTasksResponse {

public static class RunningState implements Writeable, ToXContentObject {

// Is the datafeed a "realtime" datafeed, meaning it was started without an end_time
private final boolean isRealTime;
// Has the look back finished.
private final boolean finishedLookBack;

public RunningState(boolean isRealTime, boolean finishedLookBack) {
this.isRealTime = isRealTime;
this.finishedLookBack = finishedLookBack;
}

public RunningState(StreamInput in) throws IOException {
this.isRealTime = in.readBoolean();
this.finishedLookBack = in.readBoolean();
}

@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
RunningState that = (RunningState) o;
return isRealTime == that.isRealTime && finishedLookBack == that.finishedLookBack;
}

@Override
public int hashCode() {
return Objects.hash(isRealTime, finishedLookBack);
}

@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeBoolean(isRealTime);
out.writeBoolean(finishedLookBack);
}

@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject();
builder.field("is_real_time", isRealTime);
builder.field("finished_look_back", finishedLookBack);
builder.endObject();
return builder;
}
}

private final Map<String, RunningState> datafeedRunningState;

public static Response fromResponses(List<Response> responses) {
return new Response(responses.stream()
.flatMap(r -> r.datafeedRunningState.entrySet().stream())
.filter(entry -> entry.getValue() != null)
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)));
}

public static Response fromTaskAndState(String datafeedId, RunningState runningState) {
return new Response(MapBuilder.<String, RunningState>newMapBuilder().put(datafeedId, runningState).map());
}

public Response(StreamInput in) throws IOException {
super(in);
datafeedRunningState = in.readMap(StreamInput::readString, RunningState::new);
}

public Response(Map<String, RunningState> runtimeStateMap) {
super(null, null);
this.datafeedRunningState = runtimeStateMap;
}

public Optional<RunningState> getRunningState(String datafeedId) {
return Optional.ofNullable(datafeedRunningState.get(datafeedId));
}

public Map<String, RunningState> getDatafeedRunningState() {
return datafeedRunningState;
}

@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeMap(datafeedRunningState, StreamOutput::writeString, (o, w) -> w.writeTo(o));
}

@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
Response response = (Response) o;
return Objects.equals(this.datafeedRunningState, response.datafeedRunningState);
}

@Override
public int hashCode() {
return Objects.hash(datafeedRunningState);
}
}

}
Loading