Skip to content

Commit 2dd058d

Browse files
authored
HLRC: ML Add preview datafeed api (#34284)
* HLRC: ML Add preview datafeed api * Changing deprecation handling for parser * Removing some duplication in docs, will address other APIs in another PR
1 parent dcfe64e commit 2dd058d

11 files changed

+600
-24
lines changed

client/rest-high-level/src/main/java/org/elasticsearch/client/MLRequestConverters.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@
4545
import org.elasticsearch.client.ml.GetRecordsRequest;
4646
import org.elasticsearch.client.ml.OpenJobRequest;
4747
import org.elasticsearch.client.ml.PostDataRequest;
48+
import org.elasticsearch.client.ml.PreviewDatafeedRequest;
4849
import org.elasticsearch.client.ml.PutCalendarRequest;
4950
import org.elasticsearch.client.ml.PutDatafeedRequest;
5051
import org.elasticsearch.client.ml.PutJobRequest;
@@ -259,6 +260,17 @@ static Request stopDatafeed(StopDatafeedRequest stopDatafeedRequest) throws IOEx
259260
return request;
260261
}
261262

263+
static Request previewDatafeed(PreviewDatafeedRequest previewDatafeedRequest) {
264+
String endpoint = new EndpointBuilder()
265+
.addPathPartAsIs("_xpack")
266+
.addPathPartAsIs("ml")
267+
.addPathPartAsIs("datafeeds")
268+
.addPathPart(previewDatafeedRequest.getDatafeedId())
269+
.addPathPartAsIs("_preview")
270+
.build();
271+
return new Request(HttpGet.METHOD_NAME, endpoint);
272+
}
273+
262274
static Request deleteForecast(DeleteForecastRequest deleteForecastRequest) {
263275
String endpoint = new EndpointBuilder()
264276
.addPathPartAsIs("_xpack")

client/rest-high-level/src/main/java/org/elasticsearch/client/MachineLearningClient.java

Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,8 @@
5252
import org.elasticsearch.client.ml.OpenJobResponse;
5353
import org.elasticsearch.client.ml.PostDataRequest;
5454
import org.elasticsearch.client.ml.PostDataResponse;
55+
import org.elasticsearch.client.ml.PreviewDatafeedRequest;
56+
import org.elasticsearch.client.ml.PreviewDatafeedResponse;
5557
import org.elasticsearch.client.ml.PutCalendarRequest;
5658
import org.elasticsearch.client.ml.PutCalendarResponse;
5759
import org.elasticsearch.client.ml.PutDatafeedRequest;
@@ -649,6 +651,49 @@ public void stopDatafeedAsync(StopDatafeedRequest request, RequestOptions option
649651
Collections.emptySet());
650652
}
651653

654+
/**
655+
* Previews the given Machine Learning Datafeed
656+
* <p>
657+
* For additional info
658+
* see <a href="http://www.elastic.co/guide/en/elasticsearch/reference/current/ml-preview-datafeed.html">
659+
* ML Preview Datafeed documentation</a>
660+
*
661+
* @param request The request to preview the datafeed
662+
* @param options Additional request options (e.g. headers), use {@link RequestOptions#DEFAULT} if nothing needs to be customized
663+
* @return {@link PreviewDatafeedResponse} object containing a {@link org.elasticsearch.common.bytes.BytesReference} of the data in
664+
* JSON format
665+
* @throws IOException when there is a serialization issue sending the request or receiving the response
666+
*/
667+
public PreviewDatafeedResponse previewDatafeed(PreviewDatafeedRequest request, RequestOptions options) throws IOException {
668+
return restHighLevelClient.performRequestAndParseEntity(request,
669+
MLRequestConverters::previewDatafeed,
670+
options,
671+
PreviewDatafeedResponse::fromXContent,
672+
Collections.emptySet());
673+
}
674+
675+
/**
676+
* Previews the given Machine Learning Datafeed asynchronously and notifies the listener on completion
677+
* <p>
678+
* For additional info
679+
* see <a href="http://www.elastic.co/guide/en/elasticsearch/reference/current/ml-preview-datafeed.html">
680+
* ML Preview Datafeed documentation</a>
681+
*
682+
* @param request The request to preview the datafeed
683+
* @param options Additional request options (e.g. headers), use {@link RequestOptions#DEFAULT} if nothing needs to be customized
684+
* @param listener Listener to be notified upon request completion
685+
*/
686+
public void previewDatafeedAsync(PreviewDatafeedRequest request,
687+
RequestOptions options,
688+
ActionListener<PreviewDatafeedResponse> listener) {
689+
restHighLevelClient.performRequestAsyncAndParseEntity(request,
690+
MLRequestConverters::previewDatafeed,
691+
options,
692+
PreviewDatafeedResponse::fromXContent,
693+
listener,
694+
Collections.emptySet());
695+
}
696+
652697
/**
653698
* Updates a Machine Learning {@link org.elasticsearch.client.ml.job.config.Job}
654699
* <p>
Lines changed: 100 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,100 @@
1+
/*
2+
* Licensed to Elasticsearch under one or more contributor
3+
* license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright
5+
* ownership. Elasticsearch licenses this file to you under
6+
* the Apache License, Version 2.0 (the "License"); you may
7+
* not use this file except in compliance with the License.
8+
* You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
package org.elasticsearch.client.ml;
20+
21+
import org.elasticsearch.action.ActionRequest;
22+
import org.elasticsearch.action.ActionRequestValidationException;
23+
import org.elasticsearch.client.ml.datafeed.DatafeedConfig;
24+
import org.elasticsearch.common.Strings;
25+
import org.elasticsearch.common.xcontent.ConstructingObjectParser;
26+
import org.elasticsearch.common.xcontent.ToXContentObject;
27+
import org.elasticsearch.common.xcontent.XContentBuilder;
28+
import org.elasticsearch.common.xcontent.XContentParser;
29+
30+
import java.io.IOException;
31+
import java.util.Objects;
32+
33+
/**
34+
* Request to preview a MachineLearning Datafeed
35+
*/
36+
public class PreviewDatafeedRequest extends ActionRequest implements ToXContentObject {
37+
38+
public static final ConstructingObjectParser<PreviewDatafeedRequest, Void> PARSER = new ConstructingObjectParser<>(
39+
"open_datafeed_request", true, a -> new PreviewDatafeedRequest((String) a[0]));
40+
41+
static {
42+
PARSER.declareString(ConstructingObjectParser.constructorArg(), DatafeedConfig.ID);
43+
}
44+
45+
public static PreviewDatafeedRequest fromXContent(XContentParser parser) throws IOException {
46+
return PARSER.parse(parser, null);
47+
}
48+
49+
private final String datafeedId;
50+
51+
/**
52+
* Create a new request with the desired datafeedId
53+
*
54+
* @param datafeedId unique datafeedId, must not be null
55+
*/
56+
public PreviewDatafeedRequest(String datafeedId) {
57+
this.datafeedId = Objects.requireNonNull(datafeedId, "[datafeed_id] must not be null");
58+
}
59+
60+
public String getDatafeedId() {
61+
return datafeedId;
62+
}
63+
64+
@Override
65+
public ActionRequestValidationException validate() {
66+
return null;
67+
}
68+
69+
@Override
70+
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
71+
builder.startObject();
72+
builder.field(DatafeedConfig.ID.getPreferredName(), datafeedId);
73+
builder.endObject();
74+
return builder;
75+
}
76+
77+
@Override
78+
public String toString() {
79+
return Strings.toString(this);
80+
}
81+
82+
@Override
83+
public int hashCode() {
84+
return Objects.hash(datafeedId);
85+
}
86+
87+
@Override
88+
public boolean equals(Object other) {
89+
if (this == other) {
90+
return true;
91+
}
92+
93+
if (other == null || getClass() != other.getClass()) {
94+
return false;
95+
}
96+
97+
PreviewDatafeedRequest that = (PreviewDatafeedRequest) other;
98+
return Objects.equals(datafeedId, that.datafeedId);
99+
}
100+
}
Lines changed: 113 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,113 @@
1+
/*
2+
* Licensed to Elasticsearch under one or more contributor
3+
* license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright
5+
* ownership. Elasticsearch licenses this file to you under
6+
* the Apache License, Version 2.0 (the "License"); you may
7+
* not use this file except in compliance with the License.
8+
* You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
package org.elasticsearch.client.ml;
20+
21+
import org.elasticsearch.action.ActionResponse;
22+
import org.elasticsearch.common.Strings;
23+
import org.elasticsearch.common.bytes.BytesReference;
24+
import org.elasticsearch.common.io.stream.StreamInput;
25+
import org.elasticsearch.common.xcontent.DeprecationHandler;
26+
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
27+
import org.elasticsearch.common.xcontent.ToXContentObject;
28+
import org.elasticsearch.common.xcontent.XContentBuilder;
29+
import org.elasticsearch.common.xcontent.XContentFactory;
30+
import org.elasticsearch.common.xcontent.XContentParser;
31+
import org.elasticsearch.common.xcontent.XContentType;
32+
33+
import java.io.IOException;
34+
import java.io.InputStream;
35+
import java.util.Collections;
36+
import java.util.List;
37+
import java.util.Map;
38+
import java.util.Objects;
39+
import java.util.stream.Collectors;
40+
41+
/**
42+
* Response containing a datafeed preview in JSON format
43+
*/
44+
public class PreviewDatafeedResponse extends ActionResponse implements ToXContentObject {
45+
46+
private BytesReference preview;
47+
48+
public static PreviewDatafeedResponse fromXContent(XContentParser parser) throws IOException {
49+
try (XContentBuilder builder = XContentFactory.jsonBuilder()) {
50+
parser.nextToken();
51+
builder.copyCurrentStructure(parser);
52+
return new PreviewDatafeedResponse(BytesReference.bytes(builder));
53+
}
54+
}
55+
56+
public PreviewDatafeedResponse(BytesReference preview) {
57+
this.preview = preview;
58+
}
59+
60+
public BytesReference getPreview() {
61+
return preview;
62+
}
63+
64+
/**
65+
* Parses the preview to a list of {@link Map} objects
66+
* @return List of previewed data
67+
* @throws IOException If there is a parsing issue with the {@link BytesReference}
68+
* @throws java.lang.ClassCastException If casting the raw {@link Object} entries to a {@link Map} fails
69+
*/
70+
@SuppressWarnings("unchecked")
71+
public List<Map<String, Object>> getDataList() throws IOException {
72+
try(StreamInput streamInput = preview.streamInput();
73+
XContentParser parser = XContentType.JSON.xContent()
74+
.createParser(NamedXContentRegistry.EMPTY, DeprecationHandler.THROW_UNSUPPORTED_OPERATION, streamInput)) {
75+
XContentParser.Token token = parser.nextToken();
76+
if (token == XContentParser.Token.START_ARRAY) {
77+
return parser.listOrderedMap().stream().map(obj -> (Map<String, Object>)obj).collect(Collectors.toList());
78+
} else {
79+
return Collections.singletonList(parser.mapOrdered());
80+
}
81+
}
82+
}
83+
84+
@Override
85+
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
86+
try (InputStream stream = preview.streamInput()) {
87+
builder.rawValue(stream, XContentType.JSON);
88+
}
89+
return builder;
90+
}
91+
92+
@Override
93+
public int hashCode() {
94+
return Objects.hash(preview);
95+
}
96+
97+
@Override
98+
public boolean equals(Object obj) {
99+
if (obj == null) {
100+
return false;
101+
}
102+
if (getClass() != obj.getClass()) {
103+
return false;
104+
}
105+
PreviewDatafeedResponse other = (PreviewDatafeedResponse) obj;
106+
return Objects.equals(preview, other.preview);
107+
}
108+
109+
@Override
110+
public final String toString() {
111+
return Strings.toString(this);
112+
}
113+
}

client/rest-high-level/src/test/java/org/elasticsearch/client/MLRequestConvertersTests.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@
4141
import org.elasticsearch.client.ml.GetRecordsRequest;
4242
import org.elasticsearch.client.ml.OpenJobRequest;
4343
import org.elasticsearch.client.ml.PostDataRequest;
44+
import org.elasticsearch.client.ml.PreviewDatafeedRequest;
4445
import org.elasticsearch.client.ml.PutCalendarRequest;
4546
import org.elasticsearch.client.ml.PutDatafeedRequest;
4647
import org.elasticsearch.client.ml.PutJobRequest;
@@ -293,6 +294,13 @@ public void testStopDatafeed() throws Exception {
293294
}
294295
}
295296

297+
public void testPreviewDatafeed() {
298+
PreviewDatafeedRequest datafeedRequest = new PreviewDatafeedRequest("datafeed_1");
299+
Request request = MLRequestConverters.previewDatafeed(datafeedRequest);
300+
assertEquals(HttpGet.METHOD_NAME, request.getMethod());
301+
assertEquals("/_xpack/ml/datafeeds/" + datafeedRequest.getDatafeedId() + "/_preview", request.getEndpoint());
302+
}
303+
296304
public void testDeleteForecast() {
297305
String jobId = randomAlphaOfLength(10);
298306
DeleteForecastRequest deleteForecastRequest = new DeleteForecastRequest(jobId);

client/rest-high-level/src/test/java/org/elasticsearch/client/MachineLearningIT.java

Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,8 @@
4949
import org.elasticsearch.client.ml.OpenJobResponse;
5050
import org.elasticsearch.client.ml.PostDataRequest;
5151
import org.elasticsearch.client.ml.PostDataResponse;
52+
import org.elasticsearch.client.ml.PreviewDatafeedRequest;
53+
import org.elasticsearch.client.ml.PreviewDatafeedResponse;
5254
import org.elasticsearch.client.ml.PutCalendarRequest;
5355
import org.elasticsearch.client.ml.PutCalendarResponse;
5456
import org.elasticsearch.client.ml.PutDatafeedRequest;
@@ -76,8 +78,11 @@
7678
import org.junit.After;
7779

7880
import java.io.IOException;
81+
import java.util.ArrayList;
7982
import java.util.Arrays;
83+
import java.util.Collections;
8084
import java.util.HashMap;
85+
import java.util.List;
8186
import java.util.Map;
8287
import java.util.concurrent.TimeUnit;
8388
import java.util.stream.Collectors;
@@ -564,6 +569,56 @@ public void testStopDatafeed() throws Exception {
564569
}
565570
}
566571

572+
public void testPreviewDatafeed() throws Exception {
573+
String jobId = "test-preview-datafeed";
574+
String indexName = "preview_data_1";
575+
576+
// Set up the index and docs
577+
CreateIndexRequest createIndexRequest = new CreateIndexRequest(indexName);
578+
createIndexRequest.mapping("doc", "timestamp", "type=date", "total", "type=long");
579+
highLevelClient().indices().create(createIndexRequest, RequestOptions.DEFAULT);
580+
BulkRequest bulk = new BulkRequest();
581+
bulk.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
582+
long now = (System.currentTimeMillis()/1000)*1000;
583+
long thePast = now - 60000;
584+
int i = 0;
585+
List<Integer> totalTotals = new ArrayList<>(60);
586+
while(thePast < now) {
587+
Integer total = randomInt(1000);
588+
IndexRequest doc = new IndexRequest();
589+
doc.index(indexName);
590+
doc.type("doc");
591+
doc.id("id" + i);
592+
doc.source("{\"total\":" + total + ",\"timestamp\":"+ thePast +"}", XContentType.JSON);
593+
bulk.add(doc);
594+
thePast += 1000;
595+
i++;
596+
totalTotals.add(total);
597+
}
598+
highLevelClient().bulk(bulk, RequestOptions.DEFAULT);
599+
600+
MachineLearningClient machineLearningClient = highLevelClient().machineLearning();
601+
// create the job and the datafeed
602+
Job job = buildJob(jobId);
603+
putJob(job);
604+
openJob(job);
605+
606+
String datafeedId = jobId + "-feed";
607+
DatafeedConfig datafeed = DatafeedConfig.builder(datafeedId, jobId)
608+
.setIndices(indexName)
609+
.setQueryDelay(TimeValue.timeValueSeconds(1))
610+
.setTypes(Collections.singletonList("doc"))
611+
.setFrequency(TimeValue.timeValueSeconds(1)).build();
612+
machineLearningClient.putDatafeed(new PutDatafeedRequest(datafeed), RequestOptions.DEFAULT);
613+
614+
PreviewDatafeedResponse response = execute(new PreviewDatafeedRequest(datafeedId),
615+
machineLearningClient::previewDatafeed,
616+
machineLearningClient::previewDatafeedAsync);
617+
618+
Integer[] totals = response.getDataList().stream().map(map -> (Integer)map.get("total")).toArray(Integer[]::new);
619+
assertThat(totalTotals, containsInAnyOrder(totals));
620+
}
621+
567622
public void testDeleteForecast() throws Exception {
568623
String jobId = "test-delete-forecast";
569624

0 commit comments

Comments
 (0)