Skip to content

Commit ff479e7

Browse files
committed
[ML] adding running_state to datafeed stats object
1 parent 58e51a0 commit ff479e7

File tree

13 files changed

+676
-89
lines changed

13 files changed

+676
-89
lines changed

docs/reference/ml/anomaly-detection/apis/get-datafeed-stats.asciidoc

+19-1
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ Retrieves usage information for {dfeeds}.
2424
[[ml-get-datafeed-stats-prereqs]]
2525
== {api-prereq-title}
2626

27-
Requires the `monitor_ml` cluster privilege. This privilege is included in the
27+
Requires the `monitor_ml` cluster privilege. This privilege is included in the
2828
`machine_learning_user` built-in role.
2929

3030
[[ml-get-datafeed-stats-desc]]
@@ -103,6 +103,24 @@ include::{es-repo-dir}/ml/ml-shared.asciidoc[tag=node-transport-address]
103103
====
104104
--
105105

106+
`running_state`::
107+
(object) An object containing the running state for this {dfeed}. It is only
108+
provided if the {dfeed} is started.
109+
+
110+
--
111+
[%collapsible%open]
112+
====
113+
`is_real_time`:::
114+
(boolean) Indicates if the {dfeed} is "real-time"; meaning that the {dfeed}
115+
has no configured `end` time.
116+
117+
`finished_look_back`:::
118+
(boolean) Has the {dfeed} finished running on the available past data. For {dfeeds}
119+
that without a configured `end` time, this means that the {dfeed} is now running on
120+
"real-time" data.
121+
====
122+
--
123+
106124
`state`::
107125
(string)
108126
include::{es-repo-dir}/ml/ml-shared.asciidoc[tag=state-datafeed]
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,170 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License
4+
* 2.0; you may not use this file except in compliance with the Elastic License
5+
* 2.0.
6+
*/
7+
package org.elasticsearch.xpack.core.ml.action;
8+
9+
import org.elasticsearch.action.ActionType;
10+
import org.elasticsearch.action.support.tasks.BaseTasksRequest;
11+
import org.elasticsearch.action.support.tasks.BaseTasksResponse;
12+
import org.elasticsearch.common.collect.MapBuilder;
13+
import org.elasticsearch.common.io.stream.StreamInput;
14+
import org.elasticsearch.common.io.stream.StreamOutput;
15+
import org.elasticsearch.common.io.stream.Writeable;
16+
import org.elasticsearch.common.xcontent.ToXContentObject;
17+
import org.elasticsearch.common.xcontent.XContentBuilder;
18+
import org.elasticsearch.tasks.Task;
19+
import org.elasticsearch.xpack.core.ml.MlTasks;
20+
21+
import java.io.IOException;
22+
import java.util.List;
23+
import java.util.Map;
24+
import java.util.Objects;
25+
import java.util.Optional;
26+
import java.util.Set;
27+
import java.util.stream.Collectors;
28+
29+
30+
/**
31+
* Internal only action to get the current running state of a datafeed
32+
*/
33+
public class GetDatafeedRunningStateAction extends ActionType<GetDatafeedRunningStateAction.Response> {
34+
35+
public static final GetDatafeedRunningStateAction INSTANCE = new GetDatafeedRunningStateAction();
36+
public static final String NAME = "cluster:internal/xpack/ml/datafeed/running_state";
37+
38+
private GetDatafeedRunningStateAction() {
39+
super(NAME, GetDatafeedRunningStateAction.Response::new);
40+
}
41+
42+
public static class Request extends BaseTasksRequest<Request> {
43+
44+
private final Set<String> datafeedTaskIds;
45+
46+
public Request(List<String> datafeedIds) {
47+
this.datafeedTaskIds = datafeedIds.stream().map(MlTasks::datafeedTaskId).collect(Collectors.toSet());
48+
}
49+
50+
public Request(StreamInput in) throws IOException {
51+
super(in);
52+
this.datafeedTaskIds = in.readSet(StreamInput::readString);
53+
}
54+
55+
@Override
56+
public void writeTo(StreamOutput out) throws IOException {
57+
super.writeTo(out);
58+
out.writeStringCollection(datafeedTaskIds);
59+
}
60+
61+
public Set<String> getDatafeedTaskIds() {
62+
return datafeedTaskIds;
63+
}
64+
65+
@Override
66+
public boolean match(Task task) {
67+
return task instanceof StartDatafeedAction.DatafeedTaskMatcher && datafeedTaskIds.contains(task.getDescription());
68+
}
69+
}
70+
71+
public static class Response extends BaseTasksResponse {
72+
73+
public static class RunningState implements Writeable, ToXContentObject {
74+
75+
// Is the datafeed a "realtime" datafeed, meaning it was started without an end_time
76+
private final boolean isRealTime;
77+
// Has the look back finished.
78+
private final boolean finishedLookBack;
79+
80+
public RunningState(boolean isRealTime, boolean finishedLookBack) {
81+
this.isRealTime = isRealTime;
82+
this.finishedLookBack = finishedLookBack;
83+
}
84+
85+
public RunningState(StreamInput in) throws IOException {
86+
this.isRealTime = in.readBoolean();
87+
this.finishedLookBack = in.readBoolean();
88+
}
89+
90+
@Override
91+
public boolean equals(Object o) {
92+
if (this == o) return true;
93+
if (o == null || getClass() != o.getClass()) return false;
94+
RunningState that = (RunningState) o;
95+
return isRealTime == that.isRealTime && finishedLookBack == that.finishedLookBack;
96+
}
97+
98+
@Override
99+
public int hashCode() {
100+
return Objects.hash(isRealTime, finishedLookBack);
101+
}
102+
103+
@Override
104+
public void writeTo(StreamOutput out) throws IOException {
105+
out.writeBoolean(isRealTime);
106+
out.writeBoolean(finishedLookBack);
107+
}
108+
109+
@Override
110+
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
111+
builder.startObject();
112+
builder.field("is_real_time", isRealTime);
113+
builder.field("finished_look_back", finishedLookBack);
114+
builder.endObject();
115+
return builder;
116+
}
117+
}
118+
119+
private final Map<String, RunningState> datafeedRunningState;
120+
121+
public static Response fromResponses(List<Response> responses) {
122+
return new Response(responses.stream()
123+
.flatMap(r -> r.datafeedRunningState.entrySet().stream())
124+
.filter(entry -> entry.getValue() != null)
125+
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)));
126+
}
127+
128+
public static Response fromTaskAndState(String datafeedId, RunningState runningState) {
129+
return new Response(MapBuilder.<String, RunningState>newMapBuilder().put(datafeedId, runningState).map());
130+
}
131+
132+
public Response(StreamInput in) throws IOException {
133+
super(in);
134+
datafeedRunningState = in.readMap(StreamInput::readString, RunningState::new);
135+
}
136+
137+
public Response(Map<String, RunningState> runtimeStateMap) {
138+
super(null, null);
139+
this.datafeedRunningState = runtimeStateMap;
140+
}
141+
142+
public Optional<RunningState> getRunningState(String datafeedId) {
143+
return Optional.ofNullable(datafeedRunningState.get(datafeedId));
144+
}
145+
146+
public Map<String, RunningState> getDatafeedRunningState() {
147+
return datafeedRunningState;
148+
}
149+
150+
@Override
151+
public void writeTo(StreamOutput out) throws IOException {
152+
super.writeTo(out);
153+
out.writeMap(datafeedRunningState, StreamOutput::writeString, (o, w) -> w.writeTo(o));
154+
}
155+
156+
@Override
157+
public boolean equals(Object o) {
158+
if (this == o) return true;
159+
if (o == null || getClass() != o.getClass()) return false;
160+
Response response = (Response) o;
161+
return Objects.equals(this.datafeedRunningState, response.datafeedRunningState);
162+
}
163+
164+
@Override
165+
public int hashCode() {
166+
return Objects.hash(datafeedRunningState);
167+
}
168+
}
169+
170+
}

0 commit comments

Comments
 (0)