Skip to content

Commit 6a8bef4

Browse files
committed
[ML] Datafeed config CRUD operations (#32854)
1 parent 054ae64 commit 6a8bef4

23 files changed

+1048
-308
lines changed

x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/MlMetaIndex.java

-2
Original file line numberDiff line numberDiff line change
@@ -21,8 +21,6 @@ public final class MlMetaIndex {
2121
*/
2222
public static final String INDEX_NAME = ".ml-meta";
2323

24-
public static final String INCLUDE_TYPE_KEY = "include_type";
25-
2624
public static final String TYPE = "doc";
2725

2826
private MlMetaIndex() {}

x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/MlMetadata.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -167,7 +167,7 @@ private static <T extends Writeable> void writeMap(Map<String, T> map, StreamOut
167167
@Override
168168
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
169169
DelegatingMapParams extendedParams =
170-
new DelegatingMapParams(Collections.singletonMap(ToXContentParams.FOR_CLUSTER_STATE, "true"), params);
170+
new DelegatingMapParams(Collections.singletonMap(ToXContentParams.FOR_INTERNAL_STORAGE, "true"), params);
171171
mapValuesToXContent(JOBS_FIELD, jobs, builder, extendedParams);
172172
mapValuesToXContent(DATAFEEDS_FIELD, datafeeds, builder, extendedParams);
173173
return builder;

x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/action/PutDatafeedAction.java

+1-3
Original file line numberDiff line numberDiff line change
@@ -143,9 +143,7 @@ public void writeTo(StreamOutput out) throws IOException {
143143

144144
@Override
145145
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
146-
builder.startObject();
147-
datafeed.doXContentBody(builder, params);
148-
builder.endObject();
146+
datafeed.toXContent(builder, params);
149147
return builder;
150148
}
151149

x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/calendars/Calendar.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@
1313
import org.elasticsearch.common.xcontent.ObjectParser;
1414
import org.elasticsearch.common.xcontent.ToXContentObject;
1515
import org.elasticsearch.common.xcontent.XContentBuilder;
16-
import org.elasticsearch.xpack.core.ml.MlMetaIndex;
16+
import org.elasticsearch.xpack.core.ml.utils.ToXContentParams;
1717

1818
import java.io.IOException;
1919
import java.util.Arrays;
@@ -111,7 +111,7 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
111111
if (description != null) {
112112
builder.field(DESCRIPTION.getPreferredName(), description);
113113
}
114-
if (params.paramAsBoolean(MlMetaIndex.INCLUDE_TYPE_KEY, false)) {
114+
if (params.paramAsBoolean(ToXContentParams.INCLUDE_TYPE, false)) {
115115
builder.field(TYPE.getPreferredName(), CALENDAR_TYPE);
116116
}
117117
builder.endObject();

x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/calendars/ScheduledEvent.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -15,14 +15,14 @@
1515
import org.elasticsearch.common.xcontent.ToXContentObject;
1616
import org.elasticsearch.common.xcontent.XContentBuilder;
1717
import org.elasticsearch.common.xcontent.XContentParser;
18-
import org.elasticsearch.xpack.core.ml.MlMetaIndex;
1918
import org.elasticsearch.xpack.core.ml.job.config.DetectionRule;
2019
import org.elasticsearch.xpack.core.ml.job.config.Operator;
2120
import org.elasticsearch.xpack.core.ml.job.config.RuleAction;
2221
import org.elasticsearch.xpack.core.ml.job.config.RuleCondition;
2322
import org.elasticsearch.xpack.core.ml.job.messages.Messages;
2423
import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper;
2524
import org.elasticsearch.xpack.core.ml.utils.Intervals;
25+
import org.elasticsearch.xpack.core.ml.utils.ToXContentParams;
2626
import org.elasticsearch.xpack.core.ml.utils.time.TimeUtils;
2727

2828
import java.io.IOException;
@@ -170,7 +170,7 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
170170
if (eventId != null) {
171171
builder.field(EVENT_ID.getPreferredName(), eventId);
172172
}
173-
if (params.paramAsBoolean(MlMetaIndex.INCLUDE_TYPE_KEY, false)) {
173+
if (params.paramAsBoolean(ToXContentParams.INCLUDE_TYPE, false)) {
174174
builder.field(TYPE.getPreferredName(), SCHEDULED_EVENT_TYPE);
175175
}
176176
builder.endObject();

x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/datafeed/DatafeedConfig.java

+25-7
Original file line numberDiff line numberDiff line change
@@ -71,6 +71,7 @@ public class DatafeedConfig extends AbstractDiffable<DatafeedConfig> implements
7171
public static final String DOC_COUNT = "doc_count";
7272

7373
public static final ParseField ID = new ParseField("datafeed_id");
74+
public static final ParseField CONFIG_TYPE = new ParseField("config_type");
7475
public static final ParseField QUERY_DELAY = new ParseField("query_delay");
7576
public static final ParseField FREQUENCY = new ParseField("frequency");
7677
public static final ParseField INDEXES = new ParseField("indexes");
@@ -93,6 +94,7 @@ private static ObjectParser<Builder, Void> createParser(boolean ignoreUnknownFie
9394
ObjectParser<Builder, Void> parser = new ObjectParser<>("datafeed_config", ignoreUnknownFields, Builder::new);
9495

9596
parser.declareString(Builder::setId, ID);
97+
parser.declareString((c, s) -> {}, CONFIG_TYPE);
9698
parser.declareString(Builder::setJobId, Job.ID);
9799
parser.declareStringArray(Builder::setIndices, INDEXES);
98100
parser.declareStringArray(Builder::setIndices, INDICES);
@@ -202,6 +204,16 @@ public DatafeedConfig(StreamInput in) throws IOException {
202204
}
203205
}
204206

207+
/**
208+
* The name of datafeed configuration document name from the datafeed ID.
209+
*
210+
* @param datafeedId The datafeed ID
211+
* @return The ID of document the datafeed config is persisted in
212+
*/
213+
public static String documentId(String datafeedId) {
214+
return "datafeed-" + datafeedId;
215+
}
216+
205217
public String getId() {
206218
return id;
207219
}
@@ -210,6 +222,10 @@ public String getJobId() {
210222
return jobId;
211223
}
212224

225+
public String getConfigType() {
226+
return TYPE;
227+
}
228+
213229
public TimeValue getQueryDelay() {
214230
return queryDelay;
215231
}
@@ -304,14 +320,11 @@ public void writeTo(StreamOutput out) throws IOException {
304320
@Override
305321
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
306322
builder.startObject();
307-
doXContentBody(builder, params);
308-
builder.endObject();
309-
return builder;
310-
}
311-
312-
public XContentBuilder doXContentBody(XContentBuilder builder, Params params) throws IOException {
313323
builder.field(ID.getPreferredName(), id);
314324
builder.field(Job.ID.getPreferredName(), jobId);
325+
if (params.paramAsBoolean(ToXContentParams.INCLUDE_TYPE, false) == true) {
326+
builder.field(CONFIG_TYPE.getPreferredName(), TYPE);
327+
}
315328
builder.field(QUERY_DELAY.getPreferredName(), queryDelay.getStringRep());
316329
if (frequency != null) {
317330
builder.field(FREQUENCY.getPreferredName(), frequency.getStringRep());
@@ -333,9 +346,10 @@ public XContentBuilder doXContentBody(XContentBuilder builder, Params params) th
333346
if (chunkingConfig != null) {
334347
builder.field(CHUNKING_CONFIG.getPreferredName(), chunkingConfig);
335348
}
336-
if (headers.isEmpty() == false && params.paramAsBoolean(ToXContentParams.FOR_CLUSTER_STATE, false) == true) {
349+
if (headers.isEmpty() == false && params.paramAsBoolean(ToXContentParams.FOR_INTERNAL_STORAGE, false) == true) {
337350
builder.field(HEADERS.getPreferredName(), headers);
338351
}
352+
builder.endObject();
339353
return builder;
340354
}
341355

@@ -475,6 +489,10 @@ public void setId(String datafeedId) {
475489
id = ExceptionsHelper.requireNonNull(datafeedId, ID.getPreferredName());
476490
}
477491

492+
public String getId() {
493+
return id;
494+
}
495+
478496
public void setJobId(String jobId) {
479497
this.jobId = ExceptionsHelper.requireNonNull(jobId, Job.ID.getPreferredName());
480498
}

x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/config/Detector.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -315,7 +315,7 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
315315
// negative means "unknown", which should only happen for a 5.4 job
316316
if (detectorIndex >= 0
317317
// no point writing this to cluster state, as the indexes will get reassigned on reload anyway
318-
&& params.paramAsBoolean(ToXContentParams.FOR_CLUSTER_STATE, false) == false) {
318+
&& params.paramAsBoolean(ToXContentParams.FOR_INTERNAL_STORAGE, false) == false) {
319319
builder.field(DETECTOR_INDEX.getPreferredName(), detectorIndex);
320320
}
321321
builder.endObject();

x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/config/MlFilter.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -14,10 +14,10 @@
1414
import org.elasticsearch.common.xcontent.ObjectParser;
1515
import org.elasticsearch.common.xcontent.ToXContentObject;
1616
import org.elasticsearch.common.xcontent.XContentBuilder;
17-
import org.elasticsearch.xpack.core.ml.MlMetaIndex;
1817
import org.elasticsearch.xpack.core.ml.job.messages.Messages;
1918
import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper;
2019
import org.elasticsearch.xpack.core.ml.utils.MlStrings;
20+
import org.elasticsearch.xpack.core.ml.utils.ToXContentParams;
2121

2222
import java.io.IOException;
2323
import java.util.Arrays;
@@ -101,7 +101,7 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
101101
builder.field(DESCRIPTION.getPreferredName(), description);
102102
}
103103
builder.field(ITEMS.getPreferredName(), items);
104-
if (params.paramAsBoolean(MlMetaIndex.INCLUDE_TYPE_KEY, false)) {
104+
if (params.paramAsBoolean(ToXContentParams.INCLUDE_TYPE, false)) {
105105
builder.field(TYPE.getPreferredName(), FILTER_TYPE);
106106
}
107107
builder.endObject();

x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/utils/ToXContentParams.java

+10-2
Original file line numberDiff line numberDiff line change
@@ -12,9 +12,17 @@
1212
public final class ToXContentParams {
1313

1414
/**
15-
* Parameter to indicate whether we are serialising to X Content for cluster state output.
15+
* Parameter to indicate whether we are serialising to X Content for
16+
* internal storage. Certain fields need to be persisted but should
17+
* not be visible everywhere.
1618
*/
17-
public static final String FOR_CLUSTER_STATE = "for_cluster_state";
19+
public static final String FOR_INTERNAL_STORAGE = "for_internal_storage";
20+
21+
/**
22+
* When serialising POJOs to X Content this indicates whether the type field
23+
* should be included or not
24+
*/
25+
public static final String INCLUDE_TYPE = "include_type";
1826

1927
private ToXContentParams() {
2028
}

x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/datafeed/DatafeedConfigTests.java

+42-2
Original file line numberDiff line numberDiff line change
@@ -8,13 +8,17 @@
88
import com.carrotsearch.randomizedtesting.generators.CodepointSetGenerator;
99

1010
import org.elasticsearch.ElasticsearchException;
11+
import org.elasticsearch.common.bytes.BytesReference;
1112
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
1213
import org.elasticsearch.common.io.stream.Writeable;
1314
import org.elasticsearch.common.settings.Settings;
1415
import org.elasticsearch.common.unit.TimeValue;
1516
import org.elasticsearch.common.xcontent.DeprecationHandler;
17+
import org.elasticsearch.common.xcontent.LoggingDeprecationHandler;
1618
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
19+
import org.elasticsearch.common.xcontent.ToXContent;
1720
import org.elasticsearch.common.xcontent.XContentFactory;
21+
import org.elasticsearch.common.xcontent.XContentHelper;
1822
import org.elasticsearch.common.xcontent.XContentParseException;
1923
import org.elasticsearch.common.xcontent.XContentParser;
2024
import org.elasticsearch.common.xcontent.XContentType;
@@ -36,17 +40,22 @@
3640
import org.elasticsearch.test.ESTestCase;
3741
import org.elasticsearch.xpack.core.ml.datafeed.ChunkingConfig.Mode;
3842
import org.elasticsearch.xpack.core.ml.job.messages.Messages;
43+
import org.elasticsearch.xpack.core.ml.utils.ToXContentParams;
3944
import org.joda.time.DateTimeZone;
4045

4146
import java.io.IOException;
4247
import java.util.ArrayList;
4348
import java.util.Collections;
49+
import java.util.HashMap;
4450
import java.util.List;
51+
import java.util.Map;
4552
import java.util.TimeZone;
4653

4754
import static org.hamcrest.Matchers.containsString;
4855
import static org.hamcrest.Matchers.equalTo;
4956
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
57+
import static org.hamcrest.Matchers.hasEntry;
58+
import static org.hamcrest.Matchers.hasSize;
5059
import static org.hamcrest.Matchers.is;
5160
import static org.hamcrest.Matchers.lessThan;
5261
import static org.hamcrest.Matchers.not;
@@ -63,6 +72,10 @@ public static DatafeedConfig createRandomizedDatafeedConfig(String jobId) {
6372
}
6473

6574
public static DatafeedConfig createRandomizedDatafeedConfig(String jobId, long bucketSpanMillis) {
75+
return createRandomizedDatafeedConfigBuilder(jobId, bucketSpanMillis).build();
76+
}
77+
78+
private static DatafeedConfig.Builder createRandomizedDatafeedConfigBuilder(String jobId, long bucketSpanMillis) {
6679
DatafeedConfig.Builder builder = new DatafeedConfig.Builder(randomValidDatafeedId(), jobId);
6780
builder.setIndices(randomStringList(1, 10));
6881
builder.setTypes(randomStringList(0, 10));
@@ -100,7 +113,7 @@ public static DatafeedConfig createRandomizedDatafeedConfig(String jobId, long b
100113
if (aggHistogramInterval == null) {
101114
builder.setFrequency(TimeValue.timeValueSeconds(randomIntBetween(1, 1_000_000)));
102115
} else {
103-
builder.setFrequency(TimeValue.timeValueMillis(randomIntBetween(1, 5) * aggHistogramInterval));
116+
builder.setFrequency(TimeValue.timeValueSeconds(randomIntBetween(1, 5) * aggHistogramInterval));
104117
}
105118
}
106119
if (randomBoolean()) {
@@ -109,7 +122,7 @@ public static DatafeedConfig createRandomizedDatafeedConfig(String jobId, long b
109122
if (randomBoolean()) {
110123
builder.setChunkingConfig(ChunkingConfigTests.createRandomizedChunk());
111124
}
112-
return builder.build();
125+
return builder;
113126
}
114127

115128
@Override
@@ -167,6 +180,33 @@ public void testFutureMetadataParse() throws IOException {
167180
assertNotNull(DatafeedConfig.LENIENT_PARSER.apply(parser, null).build());
168181
}
169182

183+
public void testToXContentForInternalStorage() throws IOException {
184+
DatafeedConfig.Builder builder = createRandomizedDatafeedConfigBuilder("foo", 300);
185+
186+
// headers are only persisted to cluster state
187+
Map<String, String> headers = new HashMap<>();
188+
headers.put("header-name", "header-value");
189+
builder.setHeaders(headers);
190+
DatafeedConfig config = builder.build();
191+
192+
ToXContent.MapParams params = new ToXContent.MapParams(Collections.singletonMap(ToXContentParams.FOR_INTERNAL_STORAGE, "true"));
193+
194+
BytesReference forClusterstateXContent = XContentHelper.toXContent(config, XContentType.JSON, params, false);
195+
XContentParser parser = XContentFactory.xContent(XContentType.JSON)
196+
.createParser(xContentRegistry(), LoggingDeprecationHandler.INSTANCE, forClusterstateXContent.streamInput());
197+
198+
DatafeedConfig parsedConfig = DatafeedConfig.LENIENT_PARSER.apply(parser, null).build();
199+
assertThat(parsedConfig.getHeaders(), hasEntry("header-name", "header-value"));
200+
201+
// headers are not written without the FOR_INTERNAL_STORAGE param
202+
BytesReference nonClusterstateXContent = XContentHelper.toXContent(config, XContentType.JSON, ToXContent.EMPTY_PARAMS, false);
203+
parser = XContentFactory.xContent(XContentType.JSON)
204+
.createParser(xContentRegistry(), LoggingDeprecationHandler.INSTANCE, nonClusterstateXContent.streamInput());
205+
206+
parsedConfig = DatafeedConfig.LENIENT_PARSER.apply(parser, null).build();
207+
assertThat(parsedConfig.getHeaders().entrySet(), hasSize(0));
208+
}
209+
170210
public void testCopyConstructor() {
171211
for (int i = 0; i < NUMBER_OF_TEST_RUNS; i++) {
172212
DatafeedConfig datafeedConfig = createTestInstance();

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportPostCalendarEventsAction.java

+2-1
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
import org.elasticsearch.xpack.core.ml.calendars.Calendar;
2828
import org.elasticsearch.xpack.core.ml.calendars.ScheduledEvent;
2929
import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper;
30+
import org.elasticsearch.xpack.core.ml.utils.ToXContentParams;
3031
import org.elasticsearch.xpack.ml.job.JobManager;
3132
import org.elasticsearch.xpack.ml.job.persistence.JobResultsProvider;
3233

@@ -69,7 +70,7 @@ protected void doExecute(PostCalendarEventsAction.Request request,
6970
IndexRequest indexRequest = new IndexRequest(MlMetaIndex.INDEX_NAME, MlMetaIndex.TYPE);
7071
try (XContentBuilder builder = XContentFactory.jsonBuilder()) {
7172
indexRequest.source(event.toXContent(builder,
72-
new ToXContent.MapParams(Collections.singletonMap(MlMetaIndex.INCLUDE_TYPE_KEY,
73+
new ToXContent.MapParams(Collections.singletonMap(ToXContentParams.INCLUDE_TYPE,
7374
"true"))));
7475
} catch (IOException e) {
7576
throw new IllegalStateException("Failed to serialise event", e);

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportPutCalendarAction.java

+2-1
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
import org.elasticsearch.xpack.core.ml.MlMetaIndex;
2828
import org.elasticsearch.xpack.core.ml.calendars.Calendar;
2929
import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper;
30+
import org.elasticsearch.xpack.core.ml.utils.ToXContentParams;
3031

3132
import java.io.IOException;
3233
import java.util.Collections;
@@ -55,7 +56,7 @@ protected void doExecute(PutCalendarAction.Request request, ActionListener<PutCa
5556
IndexRequest indexRequest = new IndexRequest(MlMetaIndex.INDEX_NAME, MlMetaIndex.TYPE, calendar.documentId());
5657
try (XContentBuilder builder = XContentFactory.jsonBuilder()) {
5758
indexRequest.source(calendar.toXContent(builder,
58-
new ToXContent.MapParams(Collections.singletonMap(MlMetaIndex.INCLUDE_TYPE_KEY, "true"))));
59+
new ToXContent.MapParams(Collections.singletonMap(ToXContentParams.INCLUDE_TYPE, "true"))));
5960
} catch (IOException e) {
6061
throw new IllegalStateException("Failed to serialise calendar with id [" + calendar.getId() + "]", e);
6162
}

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportPutFilterAction.java

+2-1
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
import org.elasticsearch.xpack.core.ml.action.PutFilterAction;
2929
import org.elasticsearch.xpack.core.ml.job.config.MlFilter;
3030
import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper;
31+
import org.elasticsearch.xpack.core.ml.utils.ToXContentParams;
3132

3233
import java.io.IOException;
3334
import java.util.Collections;
@@ -55,7 +56,7 @@ protected void doExecute(PutFilterAction.Request request, ActionListener<PutFilt
5556
indexRequest.opType(DocWriteRequest.OpType.CREATE);
5657
indexRequest.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
5758
try (XContentBuilder builder = XContentFactory.jsonBuilder()) {
58-
ToXContent.MapParams params = new ToXContent.MapParams(Collections.singletonMap(MlMetaIndex.INCLUDE_TYPE_KEY, "true"));
59+
ToXContent.MapParams params = new ToXContent.MapParams(Collections.singletonMap(ToXContentParams.INCLUDE_TYPE, "true"));
5960
indexRequest.source(filter.toXContent(builder, params));
6061
} catch (IOException e) {
6162
throw new IllegalStateException("Failed to serialise filter with id [" + filter.getId() + "]", e);

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportUpdateFilterAction.java

+2-1
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@
3737
import org.elasticsearch.xpack.core.ml.job.config.MlFilter;
3838
import org.elasticsearch.xpack.core.ml.job.messages.Messages;
3939
import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper;
40+
import org.elasticsearch.xpack.core.ml.utils.ToXContentParams;
4041
import org.elasticsearch.xpack.ml.job.JobManager;
4142

4243
import java.io.IOException;
@@ -106,7 +107,7 @@ private void indexUpdatedFilter(MlFilter filter, long version, UpdateFilterActio
106107
indexRequest.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
107108

108109
try (XContentBuilder builder = XContentFactory.jsonBuilder()) {
109-
ToXContent.MapParams params = new ToXContent.MapParams(Collections.singletonMap(MlMetaIndex.INCLUDE_TYPE_KEY, "true"));
110+
ToXContent.MapParams params = new ToXContent.MapParams(Collections.singletonMap(ToXContentParams.INCLUDE_TYPE, "true"));
110111
indexRequest.source(filter.toXContent(builder, params));
111112
} catch (IOException e) {
112113
throw new IllegalStateException("Failed to serialise filter with id [" + filter.getId() + "]", e);

0 commit comments

Comments
 (0)