Skip to content

Commit 73ba70d

Browse files
authored
[ML-DataFrame] Add _preview endpoint (#38924)
* [DATA-FRAME] add preview endpoint * adjusting preview tests and fixing parser * adjusing preview transport * remove unused import * adjusting test * Addressing PR comments * Fixing failing test and adjusting for pr comments * fixing integration test
1 parent 1e7f28e commit 73ba70d

File tree

9 files changed

+490
-8
lines changed

9 files changed

+490
-8
lines changed

x-pack/plugin/data-frame/qa/single-node-tests/src/test/java/org/elasticsearch/xpack/dataframe/integration/DataFramePivotRestIT.java

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,8 +12,11 @@
1212
import org.junit.Before;
1313

1414
import java.io.IOException;
15+
import java.util.Arrays;
16+
import java.util.HashSet;
1517
import java.util.List;
1618
import java.util.Map;
19+
import java.util.Set;
1720
import java.util.concurrent.TimeUnit;
1821

1922
import static org.hamcrest.Matchers.equalTo;
@@ -224,6 +227,34 @@ public void testDateHistogramPivot() throws Exception {
224227
assertOnePivotValue(dataFrameIndex + "/_search?q=by_day:2017-01-15", 3.82);
225228
}
226229

230+
@SuppressWarnings("unchecked")
231+
public void testPreviewTransform() throws Exception {
232+
final Request createPreviewRequest = new Request("POST", DATAFRAME_ENDPOINT + "_preview");
233+
234+
String config = "{"
235+
+ " \"source\": \"reviews\",";
236+
237+
config += " \"pivot\": {"
238+
+ " \"group_by\": {"
239+
+ " \"reviewer\": {\"terms\": { \"field\": \"user_id\" }},"
240+
+ " \"by_day\": {\"date_histogram\": {\"interval\": \"1d\",\"field\":\"timestamp\",\"format\":\"yyyy-MM-DD\"}}},"
241+
+ " \"aggregations\": {"
242+
+ " \"avg_rating\": {"
243+
+ " \"avg\": {"
244+
+ " \"field\": \"stars\""
245+
+ " } } } }"
246+
+ "}";
247+
createPreviewRequest.setJsonEntity(config);
248+
Map<String, Object> previewDataframeResponse = entityAsMap(client().performRequest(createPreviewRequest));
249+
List<Map<String, Object>> preview = (List<Map<String, Object>>)previewDataframeResponse.get("preview");
250+
assertThat(preview.size(), equalTo(393));
251+
Set<String> expectedFields = new HashSet<>(Arrays.asList("reviewer", "by_day", "avg_rating"));
252+
preview.forEach(p -> {
253+
Set<String> keys = p.keySet();
254+
assertThat(keys, equalTo(expectedFields));
255+
});
256+
}
257+
227258
private void startAndWaitForTransform(String transformId, String dataFrameIndex) throws IOException, Exception {
228259
// start the transform
229260
final Request startTransformRequest = new Request("POST", DATAFRAME_ENDPOINT + transformId + "/_start");

x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/DataFrame.java

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -50,12 +50,14 @@
5050
import org.elasticsearch.xpack.dataframe.action.DeleteDataFrameTransformAction;
5151
import org.elasticsearch.xpack.dataframe.action.GetDataFrameTransformsAction;
5252
import org.elasticsearch.xpack.dataframe.action.GetDataFrameTransformsStatsAction;
53+
import org.elasticsearch.xpack.dataframe.action.PreviewDataFrameTransformAction;
5354
import org.elasticsearch.xpack.dataframe.action.PutDataFrameTransformAction;
5455
import org.elasticsearch.xpack.dataframe.action.StartDataFrameTransformAction;
5556
import org.elasticsearch.xpack.dataframe.action.StopDataFrameTransformAction;
5657
import org.elasticsearch.xpack.dataframe.action.TransportDeleteDataFrameTransformAction;
5758
import org.elasticsearch.xpack.dataframe.action.TransportGetDataFrameTransformsAction;
5859
import org.elasticsearch.xpack.dataframe.action.TransportGetDataFrameTransformsStatsAction;
60+
import org.elasticsearch.xpack.dataframe.action.TransportPreviewDataFrameTransformAction;
5961
import org.elasticsearch.xpack.dataframe.action.TransportPutDataFrameTransformAction;
6062
import org.elasticsearch.xpack.dataframe.action.TransportStartDataFrameTransformAction;
6163
import org.elasticsearch.xpack.dataframe.action.TransportStopDataFrameTransformAction;
@@ -64,6 +66,7 @@
6466
import org.elasticsearch.xpack.dataframe.rest.action.RestDeleteDataFrameTransformAction;
6567
import org.elasticsearch.xpack.dataframe.rest.action.RestGetDataFrameTransformsAction;
6668
import org.elasticsearch.xpack.dataframe.rest.action.RestGetDataFrameTransformsStatsAction;
69+
import org.elasticsearch.xpack.dataframe.rest.action.RestPreviewDataFrameTransformAction;
6770
import org.elasticsearch.xpack.dataframe.rest.action.RestPutDataFrameTransformAction;
6871
import org.elasticsearch.xpack.dataframe.rest.action.RestStartDataFrameTransformAction;
6972
import org.elasticsearch.xpack.dataframe.rest.action.RestStopDataFrameTransformAction;
@@ -138,7 +141,8 @@ public List<RestHandler> getRestHandlers(final Settings settings, final RestCont
138141
new RestStopDataFrameTransformAction(settings, restController),
139142
new RestDeleteDataFrameTransformAction(settings, restController),
140143
new RestGetDataFrameTransformsAction(settings, restController),
141-
new RestGetDataFrameTransformsStatsAction(settings, restController)
144+
new RestGetDataFrameTransformsStatsAction(settings, restController),
145+
new RestPreviewDataFrameTransformAction(settings, restController)
142146
);
143147
}
144148

@@ -154,7 +158,8 @@ public List<RestHandler> getRestHandlers(final Settings settings, final RestCont
154158
new ActionHandler<>(StopDataFrameTransformAction.INSTANCE, TransportStopDataFrameTransformAction.class),
155159
new ActionHandler<>(DeleteDataFrameTransformAction.INSTANCE, TransportDeleteDataFrameTransformAction.class),
156160
new ActionHandler<>(GetDataFrameTransformsAction.INSTANCE, TransportGetDataFrameTransformsAction.class),
157-
new ActionHandler<>(GetDataFrameTransformsStatsAction.INSTANCE, TransportGetDataFrameTransformsStatsAction.class)
161+
new ActionHandler<>(GetDataFrameTransformsStatsAction.INSTANCE, TransportGetDataFrameTransformsStatsAction.class),
162+
new ActionHandler<>(PreviewDataFrameTransformAction.INSTANCE, TransportPreviewDataFrameTransformAction.class)
158163
);
159164
}
160165

Original file line numberDiff line numberDiff line change
@@ -0,0 +1,202 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License;
4+
* you may not use this file except in compliance with the Elastic License.
5+
*/
6+
7+
package org.elasticsearch.xpack.dataframe.action;
8+
9+
import org.elasticsearch.action.Action;
10+
import org.elasticsearch.action.ActionRequestValidationException;
11+
import org.elasticsearch.action.ActionResponse;
12+
import org.elasticsearch.action.support.master.AcknowledgedRequest;
13+
import org.elasticsearch.action.support.master.MasterNodeOperationRequestBuilder;
14+
import org.elasticsearch.client.ElasticsearchClient;
15+
import org.elasticsearch.common.ParseField;
16+
import org.elasticsearch.common.bytes.BytesReference;
17+
import org.elasticsearch.common.io.stream.StreamInput;
18+
import org.elasticsearch.common.io.stream.StreamOutput;
19+
import org.elasticsearch.common.xcontent.LoggingDeprecationHandler;
20+
import org.elasticsearch.common.xcontent.ObjectParser;
21+
import org.elasticsearch.common.xcontent.ToXContentObject;
22+
import org.elasticsearch.common.xcontent.XContentBuilder;
23+
import org.elasticsearch.common.xcontent.XContentFactory;
24+
import org.elasticsearch.common.xcontent.XContentParser;
25+
import org.elasticsearch.common.xcontent.XContentType;
26+
import org.elasticsearch.xpack.dataframe.transforms.DataFrameTransformConfig;
27+
28+
import java.io.IOException;
29+
import java.util.ArrayList;
30+
import java.util.List;
31+
import java.util.Map;
32+
import java.util.Objects;
33+
34+
public class PreviewDataFrameTransformAction extends Action<PreviewDataFrameTransformAction.Response> {
35+
36+
public static final PreviewDataFrameTransformAction INSTANCE = new PreviewDataFrameTransformAction();
37+
public static final String NAME = "cluster:admin/data_frame/preview";
38+
39+
private PreviewDataFrameTransformAction() {
40+
super(NAME);
41+
}
42+
43+
@Override
44+
public Response newResponse() {
45+
return new Response();
46+
}
47+
48+
public static class Request extends AcknowledgedRequest<Request> implements ToXContentObject {
49+
50+
private DataFrameTransformConfig config;
51+
52+
public Request(DataFrameTransformConfig config) {
53+
this.setConfig(config);
54+
}
55+
56+
public Request() { }
57+
58+
public static Request fromXContent(final XContentParser parser) throws IOException {
59+
Map<String, Object> content = parser.map();
60+
// Destination and ID are not required for Preview, so we just supply our own
61+
content.put(DataFrameTransformConfig.DESTINATION.getPreferredName(), "unused-transform-preview-index");
62+
try(XContentBuilder xContentBuilder = XContentFactory.jsonBuilder().map(content);
63+
XContentParser newParser = XContentType.JSON
64+
.xContent()
65+
.createParser(parser.getXContentRegistry(),
66+
LoggingDeprecationHandler.INSTANCE,
67+
BytesReference.bytes(xContentBuilder).streamInput())) {
68+
return new Request(DataFrameTransformConfig.fromXContent(newParser, "transform-preview", true));
69+
}
70+
}
71+
72+
@Override
73+
public ActionRequestValidationException validate() {
74+
return null;
75+
}
76+
77+
@Override
78+
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
79+
return this.config.toXContent(builder, params);
80+
}
81+
82+
public DataFrameTransformConfig getConfig() {
83+
return config;
84+
}
85+
86+
public void setConfig(DataFrameTransformConfig config) {
87+
this.config = config;
88+
}
89+
90+
@Override
91+
public void readFrom(StreamInput in) throws IOException {
92+
super.readFrom(in);
93+
this.config = new DataFrameTransformConfig(in);
94+
}
95+
96+
@Override
97+
public void writeTo(StreamOutput out) throws IOException {
98+
super.writeTo(out);
99+
this.config.writeTo(out);
100+
}
101+
102+
@Override
103+
public int hashCode() {
104+
return Objects.hash(config);
105+
}
106+
107+
@Override
108+
public boolean equals(Object obj) {
109+
if (obj == this) {
110+
return true;
111+
}
112+
if (obj == null || getClass() != obj.getClass()) {
113+
return false;
114+
}
115+
Request other = (Request) obj;
116+
return Objects.equals(config, other.config);
117+
}
118+
}
119+
120+
public static class RequestBuilder extends MasterNodeOperationRequestBuilder<Request, Response, RequestBuilder> {
121+
122+
protected RequestBuilder(ElasticsearchClient client, PreviewDataFrameTransformAction action) {
123+
super(client, action, new Request());
124+
}
125+
}
126+
127+
public static class Response extends ActionResponse implements ToXContentObject {
128+
129+
private List<Map<String, Object>> docs;
130+
public static ParseField PREVIEW = new ParseField("preview");
131+
132+
static ObjectParser<Response, Void> PARSER = new ObjectParser<>("data_frame_transform_preview", Response::new);
133+
static {
134+
PARSER.declareObjectArray(Response::setDocs, (p, c) -> p.mapOrdered(), PREVIEW);
135+
}
136+
public Response() {}
137+
138+
public Response(StreamInput in) throws IOException {
139+
int size = in.readInt();
140+
this.docs = new ArrayList<>(size);
141+
for (int i = 0; i < size; i++) {
142+
this.docs.add(in.readMap());
143+
}
144+
}
145+
146+
public Response(List<Map<String, Object>> docs) {
147+
this.docs = new ArrayList<>(docs);
148+
}
149+
150+
public void setDocs(List<Map<String, Object>> docs) {
151+
this.docs = new ArrayList<>(docs);
152+
}
153+
154+
@Override
155+
public void readFrom(StreamInput in) throws IOException {
156+
int size = in.readInt();
157+
this.docs = new ArrayList<>(size);
158+
for (int i = 0; i < size; i++) {
159+
this.docs.add(in.readMap());
160+
}
161+
}
162+
163+
@Override
164+
public void writeTo(StreamOutput out) throws IOException {
165+
out.writeInt(docs.size());
166+
for (Map<String, Object> doc : docs) {
167+
out.writeMapWithConsistentOrder(doc);
168+
}
169+
}
170+
171+
@Override
172+
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
173+
builder.startObject();
174+
builder.field(PREVIEW.getPreferredName(), docs);
175+
builder.endObject();
176+
return builder;
177+
}
178+
179+
@Override
180+
public boolean equals(Object obj) {
181+
if (obj == this) {
182+
return true;
183+
}
184+
185+
if (obj == null || obj.getClass() != getClass()) {
186+
return false;
187+
}
188+
189+
Response other = (Response) obj;
190+
return Objects.equals(other.docs, docs);
191+
}
192+
193+
@Override
194+
public int hashCode() {
195+
return Objects.hashCode(docs);
196+
}
197+
198+
public static Response fromXContent(final XContentParser parser) throws IOException {
199+
return PARSER.parse(parser, null);
200+
}
201+
}
202+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,84 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License;
4+
* you may not use this file except in compliance with the Elastic License.
5+
*/
6+
7+
package org.elasticsearch.xpack.dataframe.action;
8+
9+
import org.elasticsearch.action.ActionListener;
10+
import org.elasticsearch.action.search.SearchAction;
11+
import org.elasticsearch.action.support.ActionFilters;
12+
import org.elasticsearch.action.support.HandledTransportAction;
13+
import org.elasticsearch.client.Client;
14+
import org.elasticsearch.common.inject.Inject;
15+
import org.elasticsearch.license.LicenseUtils;
16+
import org.elasticsearch.license.XPackLicenseState;
17+
import org.elasticsearch.search.aggregations.bucket.composite.CompositeAggregation;
18+
import org.elasticsearch.tasks.Task;
19+
import org.elasticsearch.threadpool.ThreadPool;
20+
import org.elasticsearch.transport.TransportService;
21+
import org.elasticsearch.xpack.core.ClientHelper;
22+
import org.elasticsearch.xpack.core.XPackField;
23+
import org.elasticsearch.xpack.core.dataframe.transform.DataFrameIndexerTransformStats;
24+
import org.elasticsearch.xpack.dataframe.transforms.pivot.Pivot;
25+
26+
import java.util.List;
27+
import java.util.Map;
28+
import java.util.function.Supplier;
29+
import java.util.stream.Collectors;
30+
31+
import static org.elasticsearch.xpack.dataframe.transforms.DataFrameIndexer.COMPOSITE_AGGREGATION_NAME;
32+
33+
public class TransportPreviewDataFrameTransformAction extends
34+
HandledTransportAction<PreviewDataFrameTransformAction.Request, PreviewDataFrameTransformAction.Response> {
35+
36+
private final XPackLicenseState licenseState;
37+
private final Client client;
38+
private final ThreadPool threadPool;
39+
40+
@Inject
41+
public TransportPreviewDataFrameTransformAction(TransportService transportService, ActionFilters actionFilters,
42+
Client client, ThreadPool threadPool, XPackLicenseState licenseState) {
43+
super(PreviewDataFrameTransformAction.NAME,transportService, actionFilters,
44+
(Supplier<PreviewDataFrameTransformAction.Request>) PreviewDataFrameTransformAction.Request::new);
45+
this.licenseState = licenseState;
46+
this.client = client;
47+
this.threadPool = threadPool;
48+
}
49+
50+
@Override
51+
protected void doExecute(Task task,
52+
PreviewDataFrameTransformAction.Request request,
53+
ActionListener<PreviewDataFrameTransformAction.Response> listener) {
54+
if (!licenseState.isDataFrameAllowed()) {
55+
listener.onFailure(LicenseUtils.newComplianceException(XPackField.DATA_FRAME));
56+
return;
57+
}
58+
59+
Pivot pivot = new Pivot(request.getConfig().getSource(),
60+
request.getConfig().getQueryConfig().getQuery(),
61+
request.getConfig().getPivotConfig());
62+
63+
getPreview(pivot, ActionListener.wrap(
64+
previewResponse -> listener.onResponse(new PreviewDataFrameTransformAction.Response(previewResponse)),
65+
listener::onFailure
66+
));
67+
}
68+
69+
private void getPreview(Pivot pivot, ActionListener<List<Map<String, Object>>> listener) {
70+
ClientHelper.executeWithHeadersAsync(threadPool.getThreadContext().getHeaders(),
71+
ClientHelper.DATA_FRAME_ORIGIN,
72+
client,
73+
SearchAction.INSTANCE,
74+
pivot.buildSearchRequest(null),
75+
ActionListener.wrap(
76+
r -> {
77+
final CompositeAggregation agg = r.getAggregations().get(COMPOSITE_AGGREGATION_NAME);
78+
DataFrameIndexerTransformStats stats = new DataFrameIndexerTransformStats();
79+
listener.onResponse(pivot.extractResults(agg, stats).collect(Collectors.toList()));
80+
},
81+
listener::onFailure
82+
));
83+
}
84+
}

0 commit comments

Comments
 (0)