Skip to content

Commit e50a78c

Browse files
authored
[ML-DataFrame] version data frame transform internal index (elastic#45375) (elastic#45837)
Adds index versioning for the internal data frame transform index. Allows for new indices to be created and referenced, `GET` requests now query over the index pattern and takes the latest doc (based on INDEX name).
1 parent 1dab739 commit e50a78c

21 files changed

+607
-197
lines changed

x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/action/AbstractTransportGetResourcesAction.java

+12-3
Original file line numberDiff line numberDiff line change
@@ -80,6 +80,7 @@ protected void searchResources(AbstractGetResourcesRequest request, ActionListen
8080
sourceBuilder.from(request.getPageParams().getFrom())
8181
.size(request.getPageParams().getSize());
8282
}
83+
sourceBuilder.trackTotalHits(true);
8384

8485
IndicesOptions indicesOptions = SearchRequest.DEFAULT_INDICES_OPTIONS;
8586
SearchRequest searchRequest = new SearchRequest(getIndices())
@@ -88,7 +89,7 @@ protected void searchResources(AbstractGetResourcesRequest request, ActionListen
8889
indicesOptions.expandWildcardsOpen(),
8990
indicesOptions.expandWildcardsClosed(),
9091
indicesOptions))
91-
.source(sourceBuilder.trackTotalHits(true));
92+
.source(customSearchOptions(sourceBuilder));
9293

9394
executeAsyncWithOrigin(client.threadPool().getThreadContext(),
9495
executionOrigin(),
@@ -105,8 +106,12 @@ public void onResponse(SearchResponse response) {
105106
XContentParser parser = XContentFactory.xContent(XContentType.JSON).createParser(
106107
xContentRegistry, LoggingDeprecationHandler.INSTANCE, stream)) {
107108
Resource resource = parse(parser);
108-
docs.add(resource);
109-
foundResourceIds.add(extractIdFromResource(resource));
109+
String id = extractIdFromResource(resource);
110+
// Do not include a resource with the same ID twice
111+
if (foundResourceIds.contains(id) == false) {
112+
docs.add(resource);
113+
foundResourceIds.add(id);
114+
}
110115
} catch (IOException e) {
111116
this.onFailure(e);
112117
}
@@ -159,6 +164,10 @@ private QueryBuilder buildQuery(String[] tokens, String resourceIdField) {
159164
return boolQuery.hasClauses() ? boolQuery : QueryBuilders.matchAllQuery();
160165
}
161166

167+
protected SearchSourceBuilder customSearchOptions(SearchSourceBuilder searchSourceBuilder) {
168+
return searchSourceBuilder;
169+
}
170+
162171
@Nullable
163172
protected QueryBuilder additionalQuery() {
164173
return null;

x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/dataframe/DataFrameField.java

+2-1
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,8 @@ public final class DataFrameField {
2626
public static final ParseField INDEX_DOC_TYPE = new ParseField("doc_type");
2727
public static final ParseField SOURCE = new ParseField("source");
2828
public static final ParseField DESCRIPTION = new ParseField("description");
29+
public static final ParseField VERSION = new ParseField("version");
30+
public static final ParseField CREATE_TIME = new ParseField("create_time");
2931
public static final ParseField DESTINATION = new ParseField("dest");
3032
public static final ParseField FREQUENCY = new ParseField("frequency");
3133
public static final ParseField FORCE = new ParseField("force");
@@ -65,7 +67,6 @@ public final class DataFrameField {
6567
// strings for meta information
6668
public static final String META_FIELDNAME = "_data_frame";
6769
public static final String CREATION_DATE_MILLIS = "creation_date_in_millis";
68-
public static final String VERSION = "version";
6970
public static final String CREATED = "created";
7071
public static final String CREATED_BY = "created_by";
7172
public static final String TRANSFORM = "transform";

x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/dataframe/transforms/DataFrameTransform.java

+2-3
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,6 @@
2424
public class DataFrameTransform extends AbstractDiffable<DataFrameTransform> implements XPackPlugin.XPackPersistentTaskParams {
2525

2626
public static final String NAME = DataFrameField.TASK_NAME;
27-
public static final ParseField VERSION = new ParseField(DataFrameField.VERSION);
2827
public static final ParseField FREQUENCY = DataFrameField.FREQUENCY;
2928

3029
private final String transformId;
@@ -36,7 +35,7 @@ public class DataFrameTransform extends AbstractDiffable<DataFrameTransform> imp
3635

3736
static {
3837
PARSER.declareString(ConstructingObjectParser.constructorArg(), DataFrameField.ID);
39-
PARSER.declareString(ConstructingObjectParser.optionalConstructorArg(), VERSION);
38+
PARSER.declareString(ConstructingObjectParser.optionalConstructorArg(), DataFrameField.VERSION);
4039
PARSER.declareString(ConstructingObjectParser.optionalConstructorArg(), FREQUENCY);
4140
}
4241

@@ -90,7 +89,7 @@ public void writeTo(StreamOutput out) throws IOException {
9089
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
9190
builder.startObject();
9291
builder.field(DataFrameField.ID.getPreferredName(), transformId);
93-
builder.field(VERSION.getPreferredName(), version);
92+
builder.field(DataFrameField.VERSION.getPreferredName(), version);
9493
if (frequency != null) {
9594
builder.field(FREQUENCY.getPreferredName(), frequency.getStringRep());
9695
}

x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/dataframe/transforms/DataFrameTransformConfig.java

+9-9
Original file line numberDiff line numberDiff line change
@@ -47,8 +47,6 @@ public class DataFrameTransformConfig extends AbstractDiffable<DataFrameTransfor
4747
// types of transforms
4848
public static final ParseField PIVOT_TRANSFORM = new ParseField("pivot");
4949

50-
public static final ParseField VERSION = new ParseField("version");
51-
public static final ParseField CREATE_TIME = new ParseField("create_time");
5250
private static final ConstructingObjectParser<DataFrameTransformConfig, String> STRICT_PARSER = createParser(false);
5351
private static final ConstructingObjectParser<DataFrameTransformConfig, String> LENIENT_PARSER = createParser(true);
5452
static final int MAX_DESCRIPTION_LENGTH = 1_000;
@@ -98,8 +96,8 @@ private static ConstructingObjectParser<DataFrameTransformConfig, String> create
9896
// on strict parsing do not allow injection of headers, transform version, or create time
9997
if (lenient == false) {
10098
validateStrictParsingParams(args[6], HEADERS.getPreferredName());
101-
validateStrictParsingParams(args[9], CREATE_TIME.getPreferredName());
102-
validateStrictParsingParams(args[10], VERSION.getPreferredName());
99+
validateStrictParsingParams(args[9], DataFrameField.CREATE_TIME.getPreferredName());
100+
validateStrictParsingParams(args[10], DataFrameField.VERSION.getPreferredName());
103101
}
104102

105103
@SuppressWarnings("unchecked")
@@ -132,8 +130,9 @@ private static ConstructingObjectParser<DataFrameTransformConfig, String> create
132130
parser.declareObject(optionalConstructorArg(), (p, c) -> PivotConfig.fromXContent(p, lenient), PIVOT_TRANSFORM);
133131
parser.declareString(optionalConstructorArg(), DataFrameField.DESCRIPTION);
134132
parser.declareField(optionalConstructorArg(),
135-
p -> TimeUtils.parseTimeFieldToInstant(p, CREATE_TIME.getPreferredName()), CREATE_TIME, ObjectParser.ValueType.VALUE);
136-
parser.declareString(optionalConstructorArg(), VERSION);
133+
p -> TimeUtils.parseTimeFieldToInstant(p, DataFrameField.CREATE_TIME.getPreferredName()), DataFrameField.CREATE_TIME,
134+
ObjectParser.ValueType.VALUE);
135+
parser.declareString(optionalConstructorArg(), DataFrameField.VERSION);
137136
return parser;
138137
}
139138

@@ -256,7 +255,7 @@ public Instant getCreateTime() {
256255
}
257256

258257
public DataFrameTransformConfig setCreateTime(Instant createTime) {
259-
ExceptionsHelper.requireNonNull(createTime, CREATE_TIME.getPreferredName());
258+
ExceptionsHelper.requireNonNull(createTime, DataFrameField.CREATE_TIME.getPreferredName());
260259
this.createTime = Instant.ofEpochMilli(createTime.toEpochMilli());
261260
return this;
262261
}
@@ -332,10 +331,11 @@ public XContentBuilder toXContent(final XContentBuilder builder, final Params pa
332331
builder.field(DataFrameField.DESCRIPTION.getPreferredName(), description);
333332
}
334333
if (transformVersion != null) {
335-
builder.field(VERSION.getPreferredName(), transformVersion);
334+
builder.field(DataFrameField.VERSION.getPreferredName(), transformVersion);
336335
}
337336
if (createTime != null) {
338-
builder.timeField(CREATE_TIME.getPreferredName(), CREATE_TIME.getPreferredName() + "_string", createTime.toEpochMilli());
337+
builder.timeField(DataFrameField.CREATE_TIME.getPreferredName(), DataFrameField.CREATE_TIME.getPreferredName() + "_string",
338+
createTime.toEpochMilli());
339339
}
340340
builder.endObject();
341341
return builder;

x-pack/plugin/data-frame/qa/single-node-tests/src/test/java/org/elasticsearch/xpack/dataframe/integration/DataFrameConfigurationIndexIT.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -42,13 +42,13 @@ public void testDeleteConfigurationLeftOver() throws IOException {
4242
builder.endObject();
4343
final StringEntity entity = new StringEntity(Strings.toString(builder), ContentType.APPLICATION_JSON);
4444
Request req = new Request("PUT",
45-
DataFrameInternalIndex.INDEX_NAME + "/_doc/" + DataFrameTransformConfig.documentId(fakeTransformName));
45+
DataFrameInternalIndex.LATEST_INDEX_NAME + "/_doc/" + DataFrameTransformConfig.documentId(fakeTransformName));
4646
req.setEntity(entity);
4747
client().performRequest(req);
4848
}
4949

5050
// refresh the index
51-
assertOK(client().performRequest(new Request("POST", DataFrameInternalIndex.INDEX_NAME + "/_refresh")));
51+
assertOK(client().performRequest(new Request("POST", DataFrameInternalIndex.LATEST_INDEX_NAME + "/_refresh")));
5252

5353
Request deleteRequest = new Request("DELETE", DATAFRAME_ENDPOINT + fakeTransformName);
5454
Response deleteResponse = client().performRequest(deleteRequest);

x-pack/plugin/data-frame/qa/single-node-tests/src/test/java/org/elasticsearch/xpack/dataframe/integration/DataFrameRestTestCase.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -385,7 +385,7 @@ public void wipeDataFrameTransforms() throws IOException {
385385
assertTrue(transformConfigs.isEmpty());
386386

387387
// the configuration index should be empty
388-
Request request = new Request("GET", DataFrameInternalIndex.INDEX_NAME + "/_search");
388+
Request request = new Request("GET", DataFrameInternalIndex.LATEST_INDEX_NAME + "/_search");
389389
try {
390390
Response searchResponse = adminClient().performRequest(request);
391391
Map<String, Object> searchResult = entityAsMap(searchResponse);
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,132 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License;
4+
* you may not use this file except in compliance with the Elastic License.
5+
*/
6+
7+
package org.elasticsearch.xpack.dataframe.integration;
8+
9+
import org.elasticsearch.action.get.GetRequest;
10+
import org.elasticsearch.action.get.GetResponse;
11+
import org.elasticsearch.action.index.IndexRequest;
12+
import org.elasticsearch.action.support.WriteRequest;
13+
import org.elasticsearch.client.RequestOptions;
14+
import org.elasticsearch.client.RestHighLevelClient;
15+
import org.elasticsearch.client.dataframe.GetDataFrameTransformRequest;
16+
import org.elasticsearch.client.dataframe.GetDataFrameTransformResponse;
17+
import org.elasticsearch.client.dataframe.UpdateDataFrameTransformRequest;
18+
import org.elasticsearch.client.dataframe.UpdateDataFrameTransformResponse;
19+
import org.elasticsearch.client.dataframe.transforms.DataFrameTransformConfigUpdate;
20+
import org.elasticsearch.common.xcontent.XContentBuilder;
21+
import org.elasticsearch.common.xcontent.XContentFactory;
22+
import org.elasticsearch.search.SearchModule;
23+
import org.elasticsearch.xpack.core.dataframe.DataFrameField;
24+
import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformConfig;
25+
import org.elasticsearch.client.indices.CreateIndexRequest;
26+
import org.elasticsearch.common.settings.Settings;
27+
import org.elasticsearch.common.util.concurrent.ThreadContext;
28+
import org.elasticsearch.common.xcontent.XContentType;
29+
import org.elasticsearch.test.rest.ESRestTestCase;
30+
import org.elasticsearch.xpack.dataframe.persistence.DataFrameInternalIndex;
31+
32+
import java.io.IOException;
33+
import java.nio.charset.StandardCharsets;
34+
import java.util.Base64;
35+
import java.util.Collections;
36+
37+
import static org.elasticsearch.xpack.dataframe.persistence.DataFrameInternalIndex.addDataFrameTransformsConfigMappings;
38+
import static org.hamcrest.Matchers.is;
39+
import static org.hamcrest.Matchers.equalTo;
40+
41+
42+
public class DataFrameTransformInternalIndexIT extends ESRestTestCase {
43+
44+
45+
private static final String CURRENT_INDEX = DataFrameInternalIndex.LATEST_INDEX_NAME;
46+
private static final String OLD_INDEX = DataFrameInternalIndex.INDEX_PATTERN + "1";
47+
48+
49+
public void testUpdateDeletesOldTransformConfig() throws Exception {
50+
TestRestHighLevelClient client = new TestRestHighLevelClient();
51+
// The mapping does not need to actually be the "OLD" mapping, we are testing that the old doc gets deleted, and the new one
52+
// created.
53+
try (XContentBuilder builder = XContentFactory.jsonBuilder()) {
54+
builder.startObject();
55+
builder.startObject("properties");
56+
builder.startObject(DataFrameField.INDEX_DOC_TYPE.getPreferredName()).field("type", "keyword").endObject();
57+
addDataFrameTransformsConfigMappings(builder);
58+
builder.endObject();
59+
builder.endObject();
60+
client.indices().create(new CreateIndexRequest(OLD_INDEX).mapping(builder), RequestOptions.DEFAULT);
61+
}
62+
String transformIndex = "transform-index-deletes-old";
63+
createSourceIndex(transformIndex);
64+
String transformId = "transform-update-deletes-old-transform-config";
65+
String config = "{\"dest\": {\"index\":\"bar\"},"
66+
+ " \"source\": {\"index\":\"" + transformIndex + "\", \"query\": {\"match_all\":{}}},"
67+
+ " \"id\": \""+transformId+"\","
68+
+ " \"doc_type\": \"data_frame_transform_config\","
69+
+ " \"pivot\": {"
70+
+ " \"group_by\": {"
71+
+ " \"reviewer\": {"
72+
+ " \"terms\": {"
73+
+ " \"field\": \"user_id\""
74+
+ " } } },"
75+
+ " \"aggregations\": {"
76+
+ " \"avg_rating\": {"
77+
+ " \"avg\": {"
78+
+ " \"field\": \"stars\""
79+
+ " } } } },"
80+
+ "\"frequency\":\"1s\""
81+
+ "}";
82+
client.index(new IndexRequest(OLD_INDEX)
83+
.id(DataFrameTransformConfig.documentId(transformId))
84+
.source(config, XContentType.JSON)
85+
.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE),
86+
RequestOptions.DEFAULT);
87+
GetResponse getResponse = client.get(new GetRequest(OLD_INDEX, DataFrameTransformConfig.documentId(transformId)),
88+
RequestOptions.DEFAULT);
89+
assertThat(getResponse.isExists(), is(true));
90+
91+
GetDataFrameTransformResponse response = client.dataFrame()
92+
.getDataFrameTransform(new GetDataFrameTransformRequest(transformId), RequestOptions.DEFAULT);
93+
assertThat(response.getTransformConfigurations().get(0).getId(), equalTo(transformId));
94+
95+
UpdateDataFrameTransformResponse updated = client.dataFrame().updateDataFrameTransform(
96+
new UpdateDataFrameTransformRequest(DataFrameTransformConfigUpdate.builder().setDescription("updated").build(), transformId),
97+
RequestOptions.DEFAULT);
98+
99+
assertThat(updated.getTransformConfiguration().getId(), equalTo(transformId));
100+
assertThat(updated.getTransformConfiguration().getDescription(), equalTo("updated"));
101+
102+
// Old should now be gone
103+
getResponse = client.get(new GetRequest(OLD_INDEX, DataFrameTransformConfig.documentId(transformId)), RequestOptions.DEFAULT);
104+
assertThat(getResponse.isExists(), is(false));
105+
106+
// New should be here
107+
getResponse = client.get(new GetRequest(CURRENT_INDEX, DataFrameTransformConfig.documentId(transformId)),
108+
RequestOptions.DEFAULT);
109+
assertThat(getResponse.isExists(), is(true));
110+
}
111+
112+
113+
@Override
114+
protected Settings restClientSettings() {
115+
final String token = "Basic " +
116+
Base64.getEncoder().encodeToString(("x_pack_rest_user:x-pack-test-password").getBytes(StandardCharsets.UTF_8));
117+
return Settings.builder()
118+
.put(ThreadContext.PREFIX + ".Authorization", token)
119+
.build();
120+
}
121+
122+
private void createSourceIndex(String index) throws IOException {
123+
TestRestHighLevelClient client = new TestRestHighLevelClient();
124+
client.indices().create(new CreateIndexRequest(index), RequestOptions.DEFAULT);
125+
}
126+
127+
private class TestRestHighLevelClient extends RestHighLevelClient {
128+
TestRestHighLevelClient() {
129+
super(client(), restClient -> {}, new SearchModule(Settings.EMPTY, false, Collections.emptyList()).getNamedXContents());
130+
}
131+
}
132+
}

x-pack/plugin/data-frame/qa/single-node-tests/src/test/java/org/elasticsearch/xpack/dataframe/integration/DataFrameUsageIT.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,7 @@ public void testUsage() throws Exception {
5454
stopDataFrameTransform("test_usage", false);
5555

5656
Request statsExistsRequest = new Request("GET",
57-
DataFrameInternalIndex.INDEX_NAME+"/_search?q=" +
57+
DataFrameInternalIndex.LATEST_INDEX_NAME+"/_search?q=" +
5858
INDEX_DOC_TYPE.getPreferredName() + ":" +
5959
DataFrameTransformStoredDoc.NAME);
6060
// Verify that we have one stat document
@@ -96,7 +96,7 @@ public void testUsage() throws Exception {
9696
XContentMapValues.extractValue("data_frame.stats." + statName, statsMap));
9797
}
9898
// Refresh the index so that statistics are searchable
99-
refreshIndex(DataFrameInternalIndex.INDEX_TEMPLATE_NAME);
99+
refreshIndex(DataFrameInternalIndex.LATEST_INDEX_VERSIONED_NAME);
100100
}, 60, TimeUnit.SECONDS);
101101

102102

x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/DataFrame.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -198,7 +198,7 @@ public Collection<Object> createComponents(Client client, ClusterService cluster
198198
public UnaryOperator<Map<String, IndexTemplateMetaData>> getIndexTemplateMetaDataUpgrader() {
199199
return templates -> {
200200
try {
201-
templates.put(DataFrameInternalIndex.INDEX_TEMPLATE_NAME, DataFrameInternalIndex.getIndexTemplateMetaData());
201+
templates.put(DataFrameInternalIndex.LATEST_INDEX_VERSIONED_NAME, DataFrameInternalIndex.getIndexTemplateMetaData());
202202
} catch (IOException e) {
203203
logger.error("Error creating data frame index template", e);
204204
}

x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/DataFrameFeatureSet.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -154,7 +154,7 @@ public void usage(ActionListener<XPackFeatureSet.Usage> listener) {
154154
}
155155
);
156156

157-
SearchRequest totalTransformCount = client.prepareSearch(DataFrameInternalIndex.INDEX_NAME)
157+
SearchRequest totalTransformCount = client.prepareSearch(DataFrameInternalIndex.INDEX_NAME_PATTERN)
158158
.setTrackTotalHits(true)
159159
.setQuery(QueryBuilders.constantScoreQuery(QueryBuilders.boolQuery()
160160
.filter(QueryBuilders.termQuery(DataFrameField.INDEX_DOC_TYPE.getPreferredName(), DataFrameTransformConfig.NAME))))
@@ -196,7 +196,7 @@ static void getStatisticSummations(Client client, ActionListener<DataFrameIndexe
196196
.filter(QueryBuilders.termQuery(DataFrameField.INDEX_DOC_TYPE.getPreferredName(),
197197
DataFrameTransformStoredDoc.NAME)));
198198

199-
SearchRequestBuilder requestBuilder = client.prepareSearch(DataFrameInternalIndex.INDEX_NAME)
199+
SearchRequestBuilder requestBuilder = client.prepareSearch(DataFrameInternalIndex.INDEX_NAME_PATTERN)
200200
.setSize(0)
201201
.setQuery(queryBuilder);
202202

0 commit comments

Comments
 (0)