Skip to content

Commit 59493a2

Browse files
committed
[ML] Add Missing data checking class (#35310)
* ML: Adding missing data check class * reverting bad change * Adding bucket + missing data object for returns * reverting unnecessary change * adding license header * Make client calls synchronous, akin to DatafeedJob * Fixing line length * Renaming things, addressing PR comments
1 parent a0dcfd9 commit 59493a2

File tree

3 files changed

+468
-0
lines changed

3 files changed

+468
-0
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,234 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License;
4+
* you may not use this file except in compliance with the Elastic License.
5+
*/
6+
package org.elasticsearch.xpack.ml.integration;
7+
8+
import org.apache.logging.log4j.Logger;
9+
import org.elasticsearch.action.bulk.BulkItemResponse;
10+
import org.elasticsearch.action.bulk.BulkRequestBuilder;
11+
import org.elasticsearch.action.bulk.BulkResponse;
12+
import org.elasticsearch.action.index.IndexRequest;
13+
import org.elasticsearch.action.support.WriteRequest;
14+
import org.elasticsearch.common.unit.TimeValue;
15+
import org.elasticsearch.index.query.RangeQueryBuilder;
16+
import org.elasticsearch.search.aggregations.AggregationBuilders;
17+
import org.elasticsearch.search.aggregations.AggregatorFactories;
18+
import org.elasticsearch.search.aggregations.metrics.avg.AvgAggregationBuilder;
19+
import org.elasticsearch.search.aggregations.metrics.max.MaxAggregationBuilder;
20+
import org.elasticsearch.xpack.core.ml.action.GetBucketsAction;
21+
import org.elasticsearch.xpack.core.ml.action.util.PageParams;
22+
import org.elasticsearch.xpack.core.ml.datafeed.DatafeedConfig;
23+
import org.elasticsearch.xpack.core.ml.job.config.AnalysisConfig;
24+
import org.elasticsearch.xpack.core.ml.job.config.DataDescription;
25+
import org.elasticsearch.xpack.core.ml.job.config.Detector;
26+
import org.elasticsearch.xpack.core.ml.job.config.Job;
27+
import org.elasticsearch.xpack.core.ml.job.results.Bucket;
28+
import org.elasticsearch.xpack.core.ml.job.results.Result;
29+
import org.elasticsearch.xpack.ml.datafeed.DelayedDataDetector;
30+
import org.elasticsearch.xpack.ml.datafeed.DelayedDataDetector.BucketWithMissingData;
31+
import org.junit.After;
32+
import org.junit.Before;
33+
34+
import java.util.Collections;
35+
import java.util.Date;
36+
import java.util.List;
37+
38+
import static org.elasticsearch.xpack.ml.support.BaseMlIntegTestCase.createDatafeed;
39+
import static org.elasticsearch.xpack.ml.support.BaseMlIntegTestCase.createDatafeedBuilder;
40+
import static org.hamcrest.Matchers.equalTo;
41+
42+
public class DelayedDataDetectorIT extends MlNativeAutodetectIntegTestCase {
43+
44+
private String index = "delayed-data";
45+
private long now = System.currentTimeMillis();
46+
private long numDocs;
47+
48+
@Before
49+
public void putDataintoIndex() {
50+
client().admin().indices().prepareCreate(index)
51+
.addMapping("type", "time", "type=date", "value", "type=long")
52+
.get();
53+
numDocs = randomIntBetween(32, 128);
54+
long oneDayAgo = now - 86400000;
55+
writeData(logger, index, numDocs, oneDayAgo, now);
56+
}
57+
58+
@After
59+
public void cleanUpTest() {
60+
cleanUp();
61+
}
62+
63+
public void testMissingDataDetection() throws Exception {
64+
final String jobId = "delayed-data-detection-job";
65+
Job.Builder job = createJob(jobId, TimeValue.timeValueMinutes(5), "count", null);
66+
67+
DatafeedConfig datafeedConfig = createDatafeed(job.getId() + "-datafeed", job.getId(), Collections.singletonList(index));
68+
registerJob(job);
69+
putJob(job);
70+
openJob(job.getId());
71+
72+
registerDatafeed(datafeedConfig);
73+
putDatafeed(datafeedConfig);
74+
startDatafeed(datafeedConfig.getId(), 0L, now);
75+
waitUntilJobIsClosed(jobId);
76+
77+
// Get the latest finalized bucket
78+
Bucket lastBucket = getLatestFinalizedBucket(jobId);
79+
80+
DelayedDataDetector delayedDataDetector =
81+
new DelayedDataDetector(job.build(new Date()), datafeedConfig, TimeValue.timeValueHours(12), client());
82+
83+
List<BucketWithMissingData> response = delayedDataDetector.detectMissingData(lastBucket.getEpoch()*1000);
84+
assertThat(response.stream().mapToLong(BucketWithMissingData::getMissingDocumentCount).sum(), equalTo(0L));
85+
86+
long missingDocs = randomIntBetween(32, 128);
87+
// Simply adding data within the current delayed data detection, the choice of 43100000 is arbitrary and within the window
88+
// for the DelayedDataDetector
89+
writeData(logger, index, missingDocs, now - 43100000, lastBucket.getEpoch()*1000);
90+
91+
response = delayedDataDetector.detectMissingData(lastBucket.getEpoch()*1000);
92+
assertThat(response.stream().mapToLong(BucketWithMissingData::getMissingDocumentCount).sum(), equalTo(missingDocs));
93+
}
94+
95+
public void testMissingDataDetectionInSpecificBucket() throws Exception {
96+
final String jobId = "delayed-data-detection-job-missing-test-specific-bucket";
97+
Job.Builder job = createJob(jobId, TimeValue.timeValueMinutes(5), "count", null);
98+
99+
DatafeedConfig datafeedConfig = createDatafeed(job.getId() + "-datafeed", job.getId(), Collections.singletonList(index));
100+
registerJob(job);
101+
putJob(job);
102+
openJob(job.getId());
103+
104+
registerDatafeed(datafeedConfig);
105+
putDatafeed(datafeedConfig);
106+
107+
startDatafeed(datafeedConfig.getId(), 0L, now);
108+
waitUntilJobIsClosed(jobId);
109+
110+
// Get the latest finalized bucket
111+
Bucket lastBucket = getLatestFinalizedBucket(jobId);
112+
113+
DelayedDataDetector delayedDataDetector =
114+
new DelayedDataDetector(job.build(new Date()), datafeedConfig, TimeValue.timeValueHours(12), client());
115+
116+
long missingDocs = randomIntBetween(1, 10);
117+
118+
// Write our missing data in the bucket right before the last finalized bucket
119+
writeData(logger, index, missingDocs, (lastBucket.getEpoch() - lastBucket.getBucketSpan())*1000, lastBucket.getEpoch()*1000);
120+
List<BucketWithMissingData> response = delayedDataDetector.detectMissingData(lastBucket.getEpoch()*1000);
121+
122+
boolean hasBucketWithMissing = false;
123+
for (BucketWithMissingData bucketWithMissingData : response) {
124+
if (bucketWithMissingData.getBucket().getEpoch() == lastBucket.getEpoch() - lastBucket.getBucketSpan()) {
125+
assertThat(bucketWithMissingData.getMissingDocumentCount(), equalTo(missingDocs));
126+
hasBucketWithMissing = true;
127+
}
128+
}
129+
assertThat(hasBucketWithMissing, equalTo(true));
130+
}
131+
132+
public void testMissingDataDetectionWithAggregationsAndQuery() throws Exception {
133+
TimeValue bucketSpan = TimeValue.timeValueMinutes(10);
134+
final String jobId = "delayed-data-detection-job-aggs-no-missing-test";
135+
Job.Builder job = createJob(jobId, bucketSpan, "mean", "value", "doc_count");
136+
137+
MaxAggregationBuilder maxTime = AggregationBuilders.max("time").field("time");
138+
AvgAggregationBuilder avgAggregationBuilder = AggregationBuilders.avg("value").field("value");
139+
DatafeedConfig.Builder datafeedConfigBuilder = createDatafeedBuilder(job.getId() + "-datafeed",
140+
job.getId(),
141+
Collections.singletonList(index));
142+
datafeedConfigBuilder.setAggregations(new AggregatorFactories.Builder().addAggregator(
143+
AggregationBuilders.histogram("time")
144+
.subAggregation(maxTime)
145+
.subAggregation(avgAggregationBuilder)
146+
.field("time")
147+
.interval(TimeValue.timeValueMinutes(5).millis())));
148+
datafeedConfigBuilder.setQuery(new RangeQueryBuilder("value").gte(numDocs/2));
149+
datafeedConfigBuilder.setFrequency(TimeValue.timeValueMinutes(5));
150+
DatafeedConfig datafeedConfig = datafeedConfigBuilder.build();
151+
registerJob(job);
152+
putJob(job);
153+
openJob(job.getId());
154+
155+
registerDatafeed(datafeedConfig);
156+
putDatafeed(datafeedConfig);
157+
startDatafeed(datafeedConfig.getId(), 0L, now);
158+
waitUntilJobIsClosed(jobId);
159+
160+
// Get the latest finalized bucket
161+
Bucket lastBucket = getLatestFinalizedBucket(jobId);
162+
163+
DelayedDataDetector delayedDataDetector =
164+
new DelayedDataDetector(job.build(new Date()), datafeedConfig, TimeValue.timeValueHours(12), client());
165+
166+
List<BucketWithMissingData> response = delayedDataDetector.detectMissingData(lastBucket.getEpoch()*1000);
167+
assertThat(response.stream().mapToLong(BucketWithMissingData::getMissingDocumentCount).sum(), equalTo(0L));
168+
169+
long missingDocs = numDocs;
170+
// Simply adding data within the current delayed data detection, the choice of 43100000 is arbitrary and within the window
171+
// for the DelayedDataDetector
172+
writeData(logger, index, missingDocs, now - 43100000, lastBucket.getEpoch()*1000);
173+
174+
response = delayedDataDetector.detectMissingData(lastBucket.getEpoch()*1000);
175+
assertThat(response.stream().mapToLong(BucketWithMissingData::getMissingDocumentCount).sum(), equalTo((missingDocs+1)/2));
176+
}
177+
178+
private Job.Builder createJob(String id, TimeValue bucketSpan, String function, String field) {
179+
return createJob(id, bucketSpan, function, field, null);
180+
}
181+
182+
private Job.Builder createJob(String id, TimeValue bucketSpan, String function, String field, String summaryCountField) {
183+
DataDescription.Builder dataDescription = new DataDescription.Builder();
184+
dataDescription.setFormat(DataDescription.DataFormat.XCONTENT);
185+
dataDescription.setTimeField("time");
186+
dataDescription.setTimeFormat(DataDescription.EPOCH_MS);
187+
188+
Detector.Builder d = new Detector.Builder(function, field);
189+
AnalysisConfig.Builder analysisConfig = new AnalysisConfig.Builder(Collections.singletonList(d.build()));
190+
analysisConfig.setBucketSpan(bucketSpan);
191+
analysisConfig.setSummaryCountFieldName(summaryCountField);
192+
193+
Job.Builder builder = new Job.Builder();
194+
builder.setId(id);
195+
builder.setAnalysisConfig(analysisConfig);
196+
builder.setDataDescription(dataDescription);
197+
return builder;
198+
}
199+
200+
private void writeData(Logger logger, String index, long numDocs, long start, long end) {
201+
int maxDelta = (int) (end - start - 1);
202+
BulkRequestBuilder bulkRequestBuilder = client().prepareBulk();
203+
for (int i = 0; i < numDocs; i++) {
204+
IndexRequest indexRequest = new IndexRequest(index, "type");
205+
long timestamp = start + randomIntBetween(0, maxDelta);
206+
assert timestamp >= start && timestamp < end;
207+
indexRequest.source("time", timestamp, "value", i);
208+
bulkRequestBuilder.add(indexRequest);
209+
}
210+
BulkResponse bulkResponse = bulkRequestBuilder
211+
.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE)
212+
.get();
213+
if (bulkResponse.hasFailures()) {
214+
int failures = 0;
215+
for (BulkItemResponse itemResponse : bulkResponse) {
216+
if (itemResponse.isFailed()) {
217+
failures++;
218+
logger.error("Item response failure [{}]", itemResponse.getFailureMessage());
219+
}
220+
}
221+
fail("Bulk response contained " + failures + " failures");
222+
}
223+
logger.info("Indexed [{}] documents", numDocs);
224+
}
225+
226+
private Bucket getLatestFinalizedBucket(String jobId) {
227+
GetBucketsAction.Request getBucketsRequest = new GetBucketsAction.Request(jobId);
228+
getBucketsRequest.setExcludeInterim(true);
229+
getBucketsRequest.setSort(Result.TIMESTAMP.getPreferredName());
230+
getBucketsRequest.setDescending(true);
231+
getBucketsRequest.setPageParams(new PageParams(0, 1));
232+
return getBuckets(getBucketsRequest).get(0);
233+
}
234+
}
Lines changed: 158 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,158 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License;
4+
* you may not use this file except in compliance with the Elastic License.
5+
*/
6+
package org.elasticsearch.xpack.ml.datafeed;
7+
8+
import org.elasticsearch.action.search.SearchAction;
9+
import org.elasticsearch.action.search.SearchRequest;
10+
import org.elasticsearch.action.search.SearchResponse;
11+
import org.elasticsearch.client.Client;
12+
import org.elasticsearch.common.unit.TimeValue;
13+
import org.elasticsearch.common.util.concurrent.ThreadContext;
14+
import org.elasticsearch.search.aggregations.bucket.histogram.DateHistogramAggregationBuilder;
15+
import org.elasticsearch.search.aggregations.bucket.histogram.Histogram;
16+
import org.elasticsearch.search.builder.SearchSourceBuilder;
17+
import org.elasticsearch.xpack.core.ml.action.GetBucketsAction;
18+
import org.elasticsearch.xpack.core.ml.action.util.PageParams;
19+
import org.elasticsearch.xpack.core.ml.datafeed.DatafeedConfig;
20+
import org.elasticsearch.xpack.core.ml.datafeed.extractor.ExtractorUtils;
21+
import org.elasticsearch.xpack.core.ml.job.config.Job;
22+
import org.elasticsearch.xpack.core.ml.job.results.Bucket;
23+
import org.elasticsearch.xpack.core.ml.utils.Intervals;
24+
import org.joda.time.DateTime;
25+
26+
import java.util.HashMap;
27+
import java.util.List;
28+
import java.util.Map;
29+
import java.util.stream.Collectors;
30+
31+
import static org.elasticsearch.xpack.core.ClientHelper.ML_ORIGIN;
32+
import static org.elasticsearch.xpack.core.ClientHelper.stashWithOrigin;
33+
34+
35+
/**
36+
* This class will search the buckets and indices over a given window to determine if any data is missing
37+
*/
38+
public class DelayedDataDetector {
39+
40+
private static final String DATE_BUCKETS = "date_buckets";
41+
private final long bucketSpan;
42+
private final long window;
43+
private final DatafeedConfig datafeedConfig;
44+
private final Client client;
45+
private final Job job;
46+
47+
public DelayedDataDetector(Job job, DatafeedConfig datafeedConfig, TimeValue window, Client client) {
48+
this.job = job;
49+
this.bucketSpan = job.getAnalysisConfig().getBucketSpan().millis();
50+
this.datafeedConfig = datafeedConfig;
51+
long windowMillis = window.millis();
52+
if (windowMillis < bucketSpan) {
53+
throw new IllegalArgumentException("[window] must be greater or equal to the [bucket_span]");
54+
}
55+
if (Intervals.alignToFloor(windowMillis/bucketSpan, bucketSpan) >= 10000) {
56+
throw new IllegalArgumentException("[window] must contain less than 10000 buckets at the current [bucket_span]");
57+
}
58+
this.window = windowMillis;
59+
this.client = client;
60+
}
61+
62+
/**
63+
* This method looks at the {@link DatafeedConfig} from {@code latestFinalizedBucket - window} to {@code latestFinalizedBucket}.
64+
*
65+
* It is done synchronously, and can block for a considerable amount of time, it should only be executed within the appropriate
66+
* thread pool.
67+
*
68+
* @param latestFinalizedBucketMs The latest finalized bucket timestamp in milliseconds, signifies the end of the time window check
69+
* @return A List of {@link BucketWithMissingData} objects that contain each bucket with the current number of missing docs
70+
*/
71+
public List<BucketWithMissingData> detectMissingData(long latestFinalizedBucketMs) {
72+
final long end = Intervals.alignToFloor(latestFinalizedBucketMs, bucketSpan);
73+
final long start = Intervals.alignToFloor(latestFinalizedBucketMs - window, bucketSpan);
74+
List<Bucket> finalizedBuckets = checkBucketEvents(start, end);
75+
Map<Long, Long> indexedData = checkCurrentBucketEventCount(start, end);
76+
return finalizedBuckets.stream()
77+
// We only care about the situation when data is added to the indices
78+
// Older data could have been removed from the indices, and should not be considered "missing data"
79+
.filter(bucket -> calculateMissing(indexedData, bucket) > 0)
80+
.map(bucket -> BucketWithMissingData.fromMissingAndBucket(calculateMissing(indexedData, bucket), bucket))
81+
.collect(Collectors.toList());
82+
}
83+
84+
private List<Bucket> checkBucketEvents(long start, long end) {
85+
GetBucketsAction.Request request = new GetBucketsAction.Request(job.getId());
86+
request.setStart(Long.toString(start));
87+
request.setEnd(Long.toString(end));
88+
request.setExcludeInterim(true);
89+
request.setPageParams(new PageParams(0, (int)((end - start)/bucketSpan)));
90+
91+
try (ThreadContext.StoredContext ignore = stashWithOrigin(client.threadPool().getThreadContext(), ML_ORIGIN)) {
92+
GetBucketsAction.Response response = client.execute(GetBucketsAction.INSTANCE, request).actionGet();
93+
return response.getBuckets().results();
94+
}
95+
}
96+
97+
private Map<Long, Long> checkCurrentBucketEventCount(long start, long end) {
98+
String timeField = job.getDataDescription().getTimeField();
99+
SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder()
100+
.size(0)
101+
.aggregation(new DateHistogramAggregationBuilder(DATE_BUCKETS).interval(bucketSpan).field(timeField))
102+
.query(ExtractorUtils.wrapInTimeRangeQuery(datafeedConfig.getQuery(), timeField, start, end));
103+
104+
SearchRequest searchRequest = new SearchRequest(datafeedConfig.getIndices().toArray(new String[0])).source(searchSourceBuilder);
105+
try (ThreadContext.StoredContext ignore = stashWithOrigin(client.threadPool().getThreadContext(), ML_ORIGIN)) {
106+
SearchResponse response = client.execute(SearchAction.INSTANCE, searchRequest).actionGet();
107+
List<? extends Histogram.Bucket> buckets = ((Histogram)response.getAggregations().get(DATE_BUCKETS)).getBuckets();
108+
Map<Long, Long> hashMap = new HashMap<>(buckets.size());
109+
for (Histogram.Bucket bucket : buckets) {
110+
long bucketTime = toHistogramKeyToEpoch(bucket.getKey());
111+
if (bucketTime < 0) {
112+
throw new IllegalStateException("Histogram key [" + bucket.getKey() + "] cannot be converted to a timestamp");
113+
}
114+
hashMap.put(bucketTime, bucket.getDocCount());
115+
}
116+
return hashMap;
117+
}
118+
}
119+
120+
private static long toHistogramKeyToEpoch(Object key) {
121+
if (key instanceof DateTime) {
122+
return ((DateTime)key).getMillis();
123+
} else if (key instanceof Double) {
124+
return ((Double)key).longValue();
125+
} else if (key instanceof Long){
126+
return (Long)key;
127+
} else {
128+
return -1L;
129+
}
130+
}
131+
132+
private static long calculateMissing(Map<Long, Long> indexedData, Bucket bucket) {
133+
return indexedData.getOrDefault(bucket.getEpoch() * 1000, 0L) - bucket.getEventCount();
134+
}
135+
136+
public static class BucketWithMissingData {
137+
138+
private final long missingDocumentCount;
139+
private final Bucket bucket;
140+
141+
static BucketWithMissingData fromMissingAndBucket(long missingDocumentCount, Bucket bucket) {
142+
return new BucketWithMissingData(missingDocumentCount, bucket);
143+
}
144+
145+
private BucketWithMissingData(long missingDocumentCount, Bucket bucket) {
146+
this.missingDocumentCount = missingDocumentCount;
147+
this.bucket = bucket;
148+
}
149+
150+
public Bucket getBucket() {
151+
return bucket;
152+
}
153+
154+
public long getMissingDocumentCount() {
155+
return missingDocumentCount;
156+
}
157+
}
158+
}

0 commit comments

Comments
 (0)