Skip to content

Commit 4e3bb0e

Browse files
committed
[ML] Add option to stop datafeed that finds no data (elastic#47922)
Adds a new datafeed config option, max_empty_searches, that tells a datafeed that has never found any data to stop itself and close its associated job after a certain number of real-time searches have returned no data.
1 parent ef02a73 commit 4e3bb0e

File tree

19 files changed

+314
-70
lines changed

19 files changed

+314
-70
lines changed

client/rest-high-level/src/main/java/org/elasticsearch/client/ml/datafeed/DatafeedConfig.java

Lines changed: 24 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,7 @@ public class DatafeedConfig implements ToXContentObject {
6262
public static final ParseField SCRIPT_FIELDS = new ParseField("script_fields");
6363
public static final ParseField CHUNKING_CONFIG = new ParseField("chunking_config");
6464
public static final ParseField DELAYED_DATA_CHECK_CONFIG = new ParseField("delayed_data_check_config");
65+
public static final ParseField MAX_EMPTY_SEARCHES = new ParseField("max_empty_searches");
6566

6667
public static final ConstructingObjectParser<Builder, Void> PARSER = new ConstructingObjectParser<>(
6768
"datafeed_config", true, a -> new Builder((String)a[0], (String)a[1]));
@@ -88,6 +89,7 @@ public class DatafeedConfig implements ToXContentObject {
8889
PARSER.declareInt(Builder::setScrollSize, SCROLL_SIZE);
8990
PARSER.declareObject(Builder::setChunkingConfig, ChunkingConfig.PARSER, CHUNKING_CONFIG);
9091
PARSER.declareObject(Builder::setDelayedDataCheckConfig, DelayedDataCheckConfig.PARSER, DELAYED_DATA_CHECK_CONFIG);
92+
PARSER.declareInt(Builder::setMaxEmptySearches, MAX_EMPTY_SEARCHES);
9193
}
9294

9395
private static BytesReference parseBytes(XContentParser parser) throws IOException {
@@ -107,11 +109,12 @@ private static BytesReference parseBytes(XContentParser parser) throws IOExcepti
107109
private final Integer scrollSize;
108110
private final ChunkingConfig chunkingConfig;
109111
private final DelayedDataCheckConfig delayedDataCheckConfig;
110-
112+
private final Integer maxEmptySearches;
111113

112114
private DatafeedConfig(String id, String jobId, TimeValue queryDelay, TimeValue frequency, List<String> indices, BytesReference query,
113115
BytesReference aggregations, List<SearchSourceBuilder.ScriptField> scriptFields, Integer scrollSize,
114-
ChunkingConfig chunkingConfig, DelayedDataCheckConfig delayedDataCheckConfig) {
116+
ChunkingConfig chunkingConfig, DelayedDataCheckConfig delayedDataCheckConfig,
117+
Integer maxEmptySearches) {
115118
this.id = id;
116119
this.jobId = jobId;
117120
this.queryDelay = queryDelay;
@@ -123,6 +126,7 @@ private DatafeedConfig(String id, String jobId, TimeValue queryDelay, TimeValue
123126
this.scrollSize = scrollSize;
124127
this.chunkingConfig = chunkingConfig;
125128
this.delayedDataCheckConfig = delayedDataCheckConfig;
129+
this.maxEmptySearches = maxEmptySearches;
126130
}
127131

128132
public String getId() {
@@ -169,6 +173,10 @@ public DelayedDataCheckConfig getDelayedDataCheckConfig() {
169173
return delayedDataCheckConfig;
170174
}
171175

176+
public Integer getMaxEmptySearches() {
177+
return maxEmptySearches;
178+
}
179+
172180
@Override
173181
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
174182
builder.startObject();
@@ -205,6 +213,9 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
205213
if (delayedDataCheckConfig != null) {
206214
builder.field(DELAYED_DATA_CHECK_CONFIG.getPreferredName(), delayedDataCheckConfig);
207215
}
216+
if (maxEmptySearches != null) {
217+
builder.field(MAX_EMPTY_SEARCHES.getPreferredName(), maxEmptySearches);
218+
}
208219

209220
builder.endObject();
210221
return builder;
@@ -245,7 +256,8 @@ public boolean equals(Object other) {
245256
&& Objects.equals(asMap(this.aggregations), asMap(that.aggregations))
246257
&& Objects.equals(this.scriptFields, that.scriptFields)
247258
&& Objects.equals(this.chunkingConfig, that.chunkingConfig)
248-
&& Objects.equals(this.delayedDataCheckConfig, that.delayedDataCheckConfig);
259+
&& Objects.equals(this.delayedDataCheckConfig, that.delayedDataCheckConfig)
260+
&& Objects.equals(this.maxEmptySearches, that.maxEmptySearches);
249261
}
250262

251263
/**
@@ -256,7 +268,7 @@ public boolean equals(Object other) {
256268
@Override
257269
public int hashCode() {
258270
return Objects.hash(id, jobId, frequency, queryDelay, indices, asMap(query), scrollSize, asMap(aggregations), scriptFields,
259-
chunkingConfig, delayedDataCheckConfig);
271+
chunkingConfig, delayedDataCheckConfig, maxEmptySearches);
260272
}
261273

262274
public static Builder builder(String id, String jobId) {
@@ -276,6 +288,7 @@ public static class Builder {
276288
private Integer scrollSize;
277289
private ChunkingConfig chunkingConfig;
278290
private DelayedDataCheckConfig delayedDataCheckConfig;
291+
private Integer maxEmptySearches;
279292

280293
public Builder(String id, String jobId) {
281294
this.id = Objects.requireNonNull(id, ID.getPreferredName());
@@ -294,6 +307,7 @@ public Builder(DatafeedConfig config) {
294307
this.scrollSize = config.scrollSize;
295308
this.chunkingConfig = config.chunkingConfig;
296309
this.delayedDataCheckConfig = config.getDelayedDataCheckConfig();
310+
this.maxEmptySearches = config.getMaxEmptySearches();
297311
}
298312

299313
public Builder setIndices(List<String> indices) {
@@ -376,9 +390,14 @@ public Builder setDelayedDataCheckConfig(DelayedDataCheckConfig delayedDataCheck
376390
return this;
377391
}
378392

393+
public Builder setMaxEmptySearches(int maxEmptySearches) {
394+
this.maxEmptySearches = maxEmptySearches;
395+
return this;
396+
}
397+
379398
public DatafeedConfig build() {
380399
return new DatafeedConfig(id, jobId, queryDelay, frequency, indices, query, aggregations, scriptFields, scrollSize,
381-
chunkingConfig, delayedDataCheckConfig);
400+
chunkingConfig, delayedDataCheckConfig, maxEmptySearches);
382401
}
383402

384403
private static BytesReference xContentToBytes(ToXContentObject object) throws IOException {

client/rest-high-level/src/main/java/org/elasticsearch/client/ml/datafeed/DatafeedUpdate.java

Lines changed: 21 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -79,6 +79,7 @@ public class DatafeedUpdate implements ToXContentObject {
7979
PARSER.declareObject(Builder::setDelayedDataCheckConfig,
8080
DelayedDataCheckConfig.PARSER,
8181
DatafeedConfig.DELAYED_DATA_CHECK_CONFIG);
82+
PARSER.declareInt(Builder::setMaxEmptySearches, DatafeedConfig.MAX_EMPTY_SEARCHES);
8283
}
8384

8485
private static BytesReference parseBytes(XContentParser parser) throws IOException {
@@ -98,10 +99,12 @@ private static BytesReference parseBytes(XContentParser parser) throws IOExcepti
9899
private final Integer scrollSize;
99100
private final ChunkingConfig chunkingConfig;
100101
private final DelayedDataCheckConfig delayedDataCheckConfig;
102+
private final Integer maxEmptySearches;
101103

102104
private DatafeedUpdate(String id, String jobId, TimeValue queryDelay, TimeValue frequency, List<String> indices, BytesReference query,
103105
BytesReference aggregations, List<SearchSourceBuilder.ScriptField> scriptFields, Integer scrollSize,
104-
ChunkingConfig chunkingConfig, DelayedDataCheckConfig delayedDataCheckConfig) {
106+
ChunkingConfig chunkingConfig, DelayedDataCheckConfig delayedDataCheckConfig,
107+
Integer maxEmptySearches) {
105108
this.id = id;
106109
this.jobId = jobId;
107110
this.queryDelay = queryDelay;
@@ -113,6 +116,7 @@ private DatafeedUpdate(String id, String jobId, TimeValue queryDelay, TimeValue
113116
this.scrollSize = scrollSize;
114117
this.chunkingConfig = chunkingConfig;
115118
this.delayedDataCheckConfig = delayedDataCheckConfig;
119+
this.maxEmptySearches = maxEmptySearches;
116120
}
117121

118122
/**
@@ -152,6 +156,7 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
152156
}
153157
addOptionalField(builder, DatafeedConfig.SCROLL_SIZE, scrollSize);
154158
addOptionalField(builder, DatafeedConfig.CHUNKING_CONFIG, chunkingConfig);
159+
addOptionalField(builder, DatafeedConfig.MAX_EMPTY_SEARCHES, maxEmptySearches);
155160
builder.endObject();
156161
return builder;
157162
}
@@ -202,6 +207,10 @@ public DelayedDataCheckConfig getDelayedDataCheckConfig() {
202207
return delayedDataCheckConfig;
203208
}
204209

210+
public Integer getMaxEmptySearches() {
211+
return maxEmptySearches;
212+
}
213+
205214
private static Map<String, Object> asMap(BytesReference bytesReference) {
206215
return bytesReference == null ? null : XContentHelper.convertToMap(bytesReference, true, XContentType.JSON).v2();
207216
}
@@ -237,7 +246,8 @@ public boolean equals(Object other) {
237246
&& Objects.equals(asMap(this.aggregations), asMap(that.aggregations))
238247
&& Objects.equals(this.delayedDataCheckConfig, that.delayedDataCheckConfig)
239248
&& Objects.equals(this.scriptFields, that.scriptFields)
240-
&& Objects.equals(this.chunkingConfig, that.chunkingConfig);
249+
&& Objects.equals(this.chunkingConfig, that.chunkingConfig)
250+
&& Objects.equals(this.maxEmptySearches, that.maxEmptySearches);
241251
}
242252

243253
/**
@@ -248,7 +258,7 @@ public boolean equals(Object other) {
248258
@Override
249259
public int hashCode() {
250260
return Objects.hash(id, jobId, frequency, queryDelay, indices, asMap(query), scrollSize, asMap(aggregations), scriptFields,
251-
chunkingConfig, delayedDataCheckConfig);
261+
chunkingConfig, delayedDataCheckConfig, maxEmptySearches);
252262
}
253263

254264
public static Builder builder(String id) {
@@ -268,6 +278,7 @@ public static class Builder {
268278
private Integer scrollSize;
269279
private ChunkingConfig chunkingConfig;
270280
private DelayedDataCheckConfig delayedDataCheckConfig;
281+
private Integer maxEmptySearches;
271282

272283
public Builder(String id) {
273284
this.id = Objects.requireNonNull(id, DatafeedConfig.ID.getPreferredName());
@@ -285,6 +296,7 @@ public Builder(DatafeedUpdate config) {
285296
this.scrollSize = config.scrollSize;
286297
this.chunkingConfig = config.chunkingConfig;
287298
this.delayedDataCheckConfig = config.delayedDataCheckConfig;
299+
this.maxEmptySearches = config.maxEmptySearches;
288300
}
289301

290302
@Deprecated
@@ -364,9 +376,14 @@ public Builder setDelayedDataCheckConfig(DelayedDataCheckConfig delayedDataCheck
364376
return this;
365377
}
366378

379+
public Builder setMaxEmptySearches(int maxEmptySearches) {
380+
this.maxEmptySearches = maxEmptySearches;
381+
return this;
382+
}
383+
367384
public DatafeedUpdate build() {
368385
return new DatafeedUpdate(id, jobId, queryDelay, frequency, indices, query, aggregations, scriptFields, scrollSize,
369-
chunkingConfig, delayedDataCheckConfig);
386+
chunkingConfig, delayedDataCheckConfig, maxEmptySearches);
370387
}
371388

372389
private static BytesReference xContentToBytes(ToXContentObject object) throws IOException {

client/rest-high-level/src/test/java/org/elasticsearch/client/ml/datafeed/DatafeedConfigTests.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -106,6 +106,9 @@ public static DatafeedConfig.Builder createRandomBuilder() {
106106
if (randomBoolean()) {
107107
builder.setDelayedDataCheckConfig(DelayedDataCheckConfigTests.createRandomizedConfig());
108108
}
109+
if (randomBoolean()) {
110+
builder.setMaxEmptySearches(randomIntBetween(10, 100));
111+
}
109112
return builder;
110113
}
111114

client/rest-high-level/src/test/java/org/elasticsearch/client/ml/datafeed/DatafeedUpdateTests.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -83,6 +83,9 @@ public static DatafeedUpdate createRandom() {
8383
if (randomBoolean()) {
8484
builder.setDelayedDataCheckConfig(DelayedDataCheckConfigTests.createRandomizedConfig());
8585
}
86+
if (randomBoolean()) {
87+
builder.setMaxEmptySearches(randomIntBetween(10, 100));
88+
}
8689
return builder.build();
8790
}
8891

docs/reference/ml/anomaly-detection/apis/datafeedresource.asciidoc

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,15 @@ A {dfeed} resource has the following properties:
6565
`{"enabled": true, "check_window": "1h"}` See
6666
<<ml-datafeed-delayed-data-check-config>>.
6767

68+
`max_empty_searches`::
69+
(integer) If a real-time {dfeed} has never seen any data (including during
70+
any initial training period) then it will automatically stop itself and
71+
close its associated job after this many real-time searches that return no
72+
documents. In other words, it will stop after `frequency` times
73+
`max_empty_searches` of real-time operation. If not set
74+
then a {dfeed} with no end time that sees no data will remain started until
75+
it is explicitly stopped. By default this setting is not set.
76+
6877
[[ml-datafeed-chunking-config]]
6978
==== Chunking configuration objects
7079

docs/reference/ml/anomaly-detection/apis/update-datafeed.asciidoc

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -101,6 +101,15 @@ parallel and close one when you are satisfied with the results of the other job.
101101
(Optional, unsigned integer) The `size` parameter that is used in {es}
102102
searches. The default value is `1000`.
103103

104+
`max_empty_searches`::
105+
(Optional, integer) If a real-time {dfeed} has never seen any data (including
106+
during any initial training period) then it will automatically stop itself
107+
and close its associated job after this many real-time searches that return
108+
no documents. In other words, it will stop after `frequency` times
109+
`max_empty_searches` of real-time operation. If not set
110+
then a {dfeed} with no end time that sees no data will remain started until
111+
it is explicitly stopped. The special value `-1` unsets this setting.
112+
104113
For more information about these properties, see <<ml-datafeed-resource>>.
105114

106115

0 commit comments

Comments
 (0)