Skip to content

Commit 9f29749

Browse files
authored
[ML][Data Frame] adds new pipeline field to dest config (#43124)
* [ML][Data Frame] adds new pipeline field to dest config * Adding pipeline support to _preview * removing unused import * moving towards extracting _source from pipeline simulation * fixing permission requirement, adding _index entry to doc
1 parent 3c6bc34 commit 9f29749

File tree

18 files changed

+410
-78
lines changed

18 files changed

+410
-78
lines changed

client/rest-high-level/src/main/java/org/elasticsearch/client/dataframe/transforms/DestConfig.java

Lines changed: 50 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -28,36 +28,48 @@
2828
import java.util.Objects;
2929

3030
import static org.elasticsearch.common.xcontent.ConstructingObjectParser.constructorArg;
31+
import static org.elasticsearch.common.xcontent.ConstructingObjectParser.optionalConstructorArg;
3132

3233
/**
3334
* Configuration containing the destination index for the {@link DataFrameTransformConfig}
3435
*/
3536
public class DestConfig implements ToXContentObject {
3637

3738
public static final ParseField INDEX = new ParseField("index");
39+
public static final ParseField PIPELINE = new ParseField("pipeline");
3840

3941
public static final ConstructingObjectParser<DestConfig, Void> PARSER = new ConstructingObjectParser<>("data_frame_config_dest",
4042
true,
41-
args -> new DestConfig((String)args[0]));
43+
args -> new DestConfig((String)args[0], (String)args[1]));
4244

4345
static {
4446
PARSER.declareString(constructorArg(), INDEX);
47+
PARSER.declareString(optionalConstructorArg(), PIPELINE);
4548
}
4649

4750
private final String index;
51+
private final String pipeline;
4852

49-
public DestConfig(String index) {
53+
DestConfig(String index, String pipeline) {
5054
this.index = Objects.requireNonNull(index, INDEX.getPreferredName());
55+
this.pipeline = pipeline;
5156
}
5257

5358
public String getIndex() {
5459
return index;
5560
}
5661

62+
public String getPipeline() {
63+
return pipeline;
64+
}
65+
5766
@Override
5867
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
5968
builder.startObject();
6069
builder.field(INDEX.getPreferredName(), index);
70+
if (pipeline != null) {
71+
builder.field(PIPELINE.getPreferredName(), pipeline);
72+
}
6173
builder.endObject();
6274
return builder;
6375
}
@@ -72,11 +84,45 @@ public boolean equals(Object other) {
7284
}
7385

7486
DestConfig that = (DestConfig) other;
75-
return Objects.equals(index, that.index);
87+
return Objects.equals(index, that.index) &&
88+
Objects.equals(pipeline, that.pipeline);
7689
}
7790

7891
@Override
7992
public int hashCode(){
80-
return Objects.hash(index);
93+
return Objects.hash(index, pipeline);
94+
}
95+
96+
public static Builder builder() {
97+
return new Builder();
98+
}
99+
100+
public static class Builder {
101+
private String index;
102+
private String pipeline;
103+
104+
/**
105+
* Sets which index to which to write the data
106+
* @param index where to write the data
107+
* @return The {@link Builder} with index set
108+
*/
109+
public Builder setIndex(String index) {
110+
this.index = Objects.requireNonNull(index, INDEX.getPreferredName());
111+
return this;
112+
}
113+
114+
/**
115+
* Sets the pipeline through which the indexed documents should be processed
116+
* @param pipeline The pipeline ID
117+
* @return The {@link Builder} with pipeline set
118+
*/
119+
public Builder setPipeline(String pipeline) {
120+
this.pipeline = pipeline;
121+
return this;
122+
}
123+
124+
public DestConfig build() {
125+
return new DestConfig(index, pipeline);
126+
}
81127
}
82128
}

client/rest-high-level/src/test/java/org/elasticsearch/client/DataFrameTransformIT.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -307,7 +307,7 @@ private DataFrameTransformConfig validDataFrameTransformConfig(String id, String
307307
aggBuilder.addAggregator(AggregationBuilders.avg("avg_rating").field("stars"));
308308
PivotConfig pivotConfig = PivotConfig.builder().setGroups(groupConfig).setAggregations(aggBuilder).build();
309309

310-
DestConfig destConfig = (destination != null) ? new DestConfig(destination) : null;
310+
DestConfig destConfig = (destination != null) ? DestConfig.builder().setIndex(destination).build() : null;
311311

312312
return DataFrameTransformConfig.builder()
313313
.setId(id)
@@ -333,7 +333,7 @@ public void testGetStats() throws Exception {
333333
DataFrameTransformConfig transform = DataFrameTransformConfig.builder()
334334
.setId(id)
335335
.setSource(SourceConfig.builder().setIndex(sourceIndex).setQuery(new MatchAllQueryBuilder()).build())
336-
.setDest(new DestConfig("pivot-dest"))
336+
.setDest(DestConfig.builder().setIndex("pivot-dest").build())
337337
.setPivotConfig(pivotConfig)
338338
.setDescription("transform for testing stats")
339339
.build();

client/rest-high-level/src/test/java/org/elasticsearch/client/dataframe/transforms/DestConfigTests.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,8 @@
2727
public class DestConfigTests extends AbstractXContentTestCase<DestConfig> {
2828

2929
public static DestConfig randomDestConfig() {
30-
return new DestConfig(randomAlphaOfLength(10));
30+
return new DestConfig(randomAlphaOfLength(10),
31+
randomBoolean() ? null : randomAlphaOfLength(10));
3132
}
3233

3334
@Override

client/rest-high-level/src/test/java/org/elasticsearch/client/documentation/DataFrameTransformDocumentationIT.java

Lines changed: 11 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -125,6 +125,11 @@ public void testPutDataFrameTransform() throws IOException, InterruptedException
125125
.setIndex("source-index")
126126
.setQueryConfig(queryConfig).build();
127127
// end::put-data-frame-transform-source-config
128+
// tag::put-data-frame-transform-dest-config
129+
DestConfig destConfig = DestConfig.builder()
130+
.setIndex("pivot-destination")
131+
.setPipeline("my-pipeline").build();
132+
// end::put-data-frame-transform-dest-config
128133
// tag::put-data-frame-transform-group-config
129134
GroupConfig groupConfig = GroupConfig.builder()
130135
.groupBy("reviewer", // <1>
@@ -149,7 +154,7 @@ public void testPutDataFrameTransform() throws IOException, InterruptedException
149154
.builder()
150155
.setId("reviewer-avg-rating") // <1>
151156
.setSource(sourceConfig) // <2>
152-
.setDest(new DestConfig("pivot-destination")) // <3>
157+
.setDest(destConfig) // <3>
153158
.setPivotConfig(pivotConfig) // <4>
154159
.setDescription("This is my test transform") // <5>
155160
.build();
@@ -222,7 +227,7 @@ public void testStartStop() throws IOException, InterruptedException {
222227
DataFrameTransformConfig transformConfig = DataFrameTransformConfig.builder()
223228
.setId("mega-transform")
224229
.setSource(SourceConfig.builder().setIndex("source-data").setQueryConfig(queryConfig).build())
225-
.setDest(new DestConfig("pivot-dest"))
230+
.setDest(DestConfig.builder().setIndex("pivot-dest").build())
226231
.setPivotConfig(pivotConfig)
227232
.build();
228233

@@ -344,7 +349,7 @@ public void testDeleteDataFrameTransform() throws IOException, InterruptedExcept
344349
.setIndex("source-data")
345350
.setQuery(new MatchAllQueryBuilder())
346351
.build())
347-
.setDest(new DestConfig("pivot-dest"))
352+
.setDest(DestConfig.builder().setIndex("pivot-dest").build())
348353
.setPivotConfig(pivotConfig)
349354
.build();
350355
DataFrameTransformConfig transformConfig2 = DataFrameTransformConfig.builder()
@@ -353,7 +358,7 @@ public void testDeleteDataFrameTransform() throws IOException, InterruptedExcept
353358
.setIndex("source-data")
354359
.setQuery(new MatchAllQueryBuilder())
355360
.build())
356-
.setDest(new DestConfig("pivot-dest2"))
361+
.setDest(DestConfig.builder().setIndex("pivot-dest2").build())
357362
.setPivotConfig(pivotConfig)
358363
.build();
359364

@@ -488,7 +493,7 @@ public void testGetStats() throws IOException, InterruptedException {
488493
.setIndex("source-data")
489494
.setQuery(new MatchAllQueryBuilder())
490495
.build())
491-
.setDest(new DestConfig("pivot-dest"))
496+
.setDest(DestConfig.builder().setIndex("pivot-dest").build())
492497
.setPivotConfig(pivotConfig)
493498
.build();
494499
client.dataFrame().putDataFrameTransform(new PutDataFrameTransformRequest(transformConfig), RequestOptions.DEFAULT);
@@ -574,7 +579,7 @@ public void testGetDataFrameTransform() throws IOException, InterruptedException
574579
.setIndex("source-data")
575580
.setQuery(new MatchAllQueryBuilder())
576581
.build())
577-
.setDest(new DestConfig("pivot-dest"))
582+
.setDest(DestConfig.builder().setIndex("pivot-dest").build())
578583
.setPivotConfig(pivotConfig)
579584
.build();
580585

docs/java-rest/high-level/dataframe/put_data_frame.asciidoc

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ include-tagged::{doc-tests-file}[{api}-config]
3333
--------------------------------------------------
3434
<1> The {dataframe-transform} ID
3535
<2> The source indices and query from which to gather data
36-
<3> The destination index
36+
<3> The destination index and optional pipeline
3737
<4> The PivotConfig
3838
<5> Optional free text description of the transform
3939

@@ -49,6 +49,16 @@ If query is not set, a `match_all` query is used by default.
4949
include-tagged::{doc-tests-file}[{api}-source-config]
5050
--------------------------------------------------
5151

52+
==== DestConfig
53+
54+
The index where to write the data and the optional pipeline
55+
through which the docs should be indexed
56+
57+
["source","java",subs="attributes,callouts,macros"]
58+
--------------------------------------------------
59+
include-tagged::{doc-tests-file}[{api}-dest-config]
60+
--------------------------------------------------
61+
5262
===== QueryConfig
5363

5464
The query with which to select data from the source.

docs/reference/data-frames/apis/put-transform.asciidoc

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,8 @@ IMPORTANT: You must use {kib} or this API to create a {dataframe-transform}.
3838
`source` (required):: (object) The source configuration, consisting of `index` and optionally
3939
a `query`.
4040

41-
`dest` (required):: (object) The destination configuration, consisting of `index`.
41+
`dest` (required):: (object) The destination configuration, consisting of `index` and optionally a
42+
`pipeline` id.
4243

4344
`pivot`:: (object) Defines the pivot function `group by` fields and the aggregation to
4445
reduce the data. See <<data-frame-transform-pivot, data frame transform pivot objects>>.
@@ -76,7 +77,8 @@ PUT _data_frame/transforms/ecommerce_transform
7677
}
7778
},
7879
"dest": {
79-
"index": "kibana_sample_data_ecommerce_transform"
80+
"index": "kibana_sample_data_ecommerce_transform",
81+
"pipeline": "add_timestamp_pipeline"
8082
},
8183
"pivot": {
8284
"group_by": {

x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/dataframe/action/PreviewDataFrameTransformAction.java

Lines changed: 16 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -24,10 +24,11 @@
2424
import org.elasticsearch.common.xcontent.XContentType;
2525
import org.elasticsearch.xpack.core.dataframe.DataFrameField;
2626
import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformConfig;
27+
import org.elasticsearch.xpack.core.dataframe.transforms.DestConfig;
2728

2829
import java.io.IOException;
2930
import java.util.ArrayList;
30-
import java.util.Collections;
31+
import java.util.HashMap;
3132
import java.util.List;
3233
import java.util.Map;
3334
import java.util.Objects;
@@ -66,8 +67,20 @@ public Request(StreamInput in) throws IOException {
6667

6768
public static Request fromXContent(final XContentParser parser) throws IOException {
6869
Map<String, Object> content = parser.map();
69-
// Destination and ID are not required for Preview, so we just supply our own
70-
content.put(DataFrameField.DESTINATION.getPreferredName(), Collections.singletonMap("index", "unused-transform-preview-index"));
70+
// dest.index and ID are not required for Preview, so we just supply our own
71+
Map<String, String> tempDestination = new HashMap<>();
72+
tempDestination.put(DestConfig.INDEX.getPreferredName(), "unused-transform-preview-index");
73+
// Users can still provide just dest.pipeline to preview what their data would look like given the pipeline ID
74+
Object providedDestination = content.get(DataFrameField.DESTINATION.getPreferredName());
75+
if (providedDestination instanceof Map) {
76+
@SuppressWarnings("unchecked")
77+
Map<String, String> destMap = (Map<String, String>)providedDestination;
78+
String pipeline = destMap.get(DestConfig.PIPELINE.getPreferredName());
79+
if (pipeline != null) {
80+
tempDestination.put(DestConfig.PIPELINE.getPreferredName(), pipeline);
81+
}
82+
}
83+
content.put(DataFrameField.DESTINATION.getPreferredName(), tempDestination);
7184
content.put(DataFrameField.ID.getPreferredName(), "transform-preview");
7285
try(XContentBuilder xContentBuilder = XContentFactory.jsonBuilder().map(content);
7386
XContentParser newParser = XContentType.JSON

x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/dataframe/transforms/DestConfig.java

Lines changed: 26 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66

77
package org.elasticsearch.xpack.core.dataframe.transforms;
88

9+
import org.elasticsearch.Version;
910
import org.elasticsearch.common.ParseField;
1011
import org.elasticsearch.common.io.stream.StreamInput;
1112
import org.elasticsearch.common.io.stream.StreamOutput;
@@ -20,49 +21,69 @@
2021
import java.util.Objects;
2122

2223
import static org.elasticsearch.common.xcontent.ConstructingObjectParser.constructorArg;
24+
import static org.elasticsearch.common.xcontent.ConstructingObjectParser.optionalConstructorArg;
2325

2426
public class DestConfig implements Writeable, ToXContentObject {
2527

2628
public static final ParseField INDEX = new ParseField("index");
29+
public static final ParseField PIPELINE = new ParseField("pipeline");
2730

2831
public static final ConstructingObjectParser<DestConfig, Void> STRICT_PARSER = createParser(false);
2932
public static final ConstructingObjectParser<DestConfig, Void> LENIENT_PARSER = createParser(true);
3033

3134
private static ConstructingObjectParser<DestConfig, Void> createParser(boolean lenient) {
3235
ConstructingObjectParser<DestConfig, Void> parser = new ConstructingObjectParser<>("data_frame_config_dest",
3336
lenient,
34-
args -> new DestConfig((String)args[0]));
37+
args -> new DestConfig((String)args[0], (String) args[1]));
3538
parser.declareString(constructorArg(), INDEX);
39+
parser.declareString(optionalConstructorArg(), PIPELINE);
3640
return parser;
3741
}
3842

3943
private final String index;
44+
private final String pipeline;
4045

41-
public DestConfig(String index) {
46+
public DestConfig(String index, String pipeline) {
4247
this.index = ExceptionsHelper.requireNonNull(index, INDEX.getPreferredName());
48+
this.pipeline = pipeline;
4349
}
4450

4551
public DestConfig(final StreamInput in) throws IOException {
4652
index = in.readString();
53+
if (in.getVersion().onOrAfter(Version.CURRENT)) {
54+
pipeline = in.readOptionalString();
55+
} else {
56+
pipeline = null;
57+
}
4758
}
4859

4960
public String getIndex() {
5061
return index;
5162
}
5263

64+
public String getPipeline() {
65+
return pipeline;
66+
}
67+
5368
public boolean isValid() {
5469
return index.isEmpty() == false;
5570
}
5671

5772
@Override
5873
public void writeTo(StreamOutput out) throws IOException {
5974
out.writeString(index);
75+
if (out.getVersion().onOrAfter(Version.CURRENT)) {
76+
out.writeOptionalString(pipeline);
77+
}
6078
}
6179

6280
@Override
6381
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
6482
builder.startObject();
6583
builder.field(INDEX.getPreferredName(), index);
84+
if (pipeline != null) {
85+
builder.field(PIPELINE.getPreferredName(), pipeline);
86+
}
6687
builder.endObject();
6788
return builder;
6889
}
@@ -77,12 +98,13 @@ public boolean equals(Object other) {
7798
}
7899

79100
DestConfig that = (DestConfig) other;
80-
return Objects.equals(index, that.index);
101+
return Objects.equals(index, that.index) &&
102+
Objects.equals(pipeline, that.pipeline);
81103
}
82104

83105
@Override
84106
public int hashCode(){
85-
return Objects.hash(index);
107+
return Objects.hash(index, pipeline);
86108
}
87109

88110
public static DestConfig fromXContent(final XContentParser parser, boolean lenient) throws IOException {

x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/dataframe/action/PreviewDataFrameTransformActionRequestTests.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@ protected boolean supportsUnknownFields() {
4040
@Override
4141
protected Request createTestInstance() {
4242
DataFrameTransformConfig config = new DataFrameTransformConfig("transform-preview", randomSourceConfig(),
43-
new DestConfig("unused-transform-preview-index"),
43+
new DestConfig("unused-transform-preview-index", null),
4444
null, PivotConfigTests.randomPivotConfig(), null);
4545
return new Request(config);
4646
}

x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/dataframe/transforms/DestConfigTests.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,8 @@ public class DestConfigTests extends AbstractSerializingDataFrameTestCase<DestCo
1717
private boolean lenient;
1818

1919
public static DestConfig randomDestConfig() {
20-
return new DestConfig(randomAlphaOfLength(10));
20+
return new DestConfig(randomAlphaOfLength(10),
21+
randomBoolean() ? null : randomAlphaOfLength(10));
2122
}
2223

2324
@Before

x-pack/plugin/data-frame/qa/multi-node-tests/src/test/java/org/elasticsearch/xpack/dataframe/integration/DataFrameIntegTestCase.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -205,7 +205,7 @@ protected DataFrameTransformConfig createTransformConfig(String id,
205205
return DataFrameTransformConfig.builder()
206206
.setId(id)
207207
.setSource(SourceConfig.builder().setIndex(sourceIndices).setQueryConfig(createQueryConfig(queryBuilder)).build())
208-
.setDest(new DestConfig(destinationIndex))
208+
.setDest(DestConfig.builder().setIndex(destinationIndex).build())
209209
.setPivotConfig(createPivotConfig(groups, aggregations))
210210
.setDescription("Test data frame transform config id: " + id)
211211
.build();

0 commit comments

Comments
 (0)