diff --git a/x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/DelayedDataDetectorIT.java b/x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/DelayedDataDetectorIT.java new file mode 100644 index 0000000000000..c672fadde7812 --- /dev/null +++ b/x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/DelayedDataDetectorIT.java @@ -0,0 +1,234 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.ml.integration; + +import org.apache.logging.log4j.Logger; +import org.elasticsearch.action.bulk.BulkItemResponse; +import org.elasticsearch.action.bulk.BulkRequestBuilder; +import org.elasticsearch.action.bulk.BulkResponse; +import org.elasticsearch.action.index.IndexRequest; +import org.elasticsearch.action.support.WriteRequest; +import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.index.query.RangeQueryBuilder; +import org.elasticsearch.search.aggregations.AggregationBuilders; +import org.elasticsearch.search.aggregations.AggregatorFactories; +import org.elasticsearch.search.aggregations.metrics.AvgAggregationBuilder; +import org.elasticsearch.search.aggregations.metrics.MaxAggregationBuilder; +import org.elasticsearch.xpack.core.ml.action.GetBucketsAction; +import org.elasticsearch.xpack.core.ml.action.util.PageParams; +import org.elasticsearch.xpack.core.ml.datafeed.DatafeedConfig; +import org.elasticsearch.xpack.core.ml.job.config.AnalysisConfig; +import org.elasticsearch.xpack.core.ml.job.config.DataDescription; +import org.elasticsearch.xpack.core.ml.job.config.Detector; +import org.elasticsearch.xpack.core.ml.job.config.Job; +import org.elasticsearch.xpack.core.ml.job.results.Bucket; +import org.elasticsearch.xpack.core.ml.job.results.Result; +import org.elasticsearch.xpack.ml.datafeed.DelayedDataDetector; +import org.elasticsearch.xpack.ml.datafeed.DelayedDataDetector.BucketWithMissingData; +import org.junit.After; +import org.junit.Before; + +import java.util.Collections; +import java.util.Date; +import java.util.List; + +import static org.elasticsearch.xpack.ml.support.BaseMlIntegTestCase.createDatafeed; +import static org.elasticsearch.xpack.ml.support.BaseMlIntegTestCase.createDatafeedBuilder; +import static org.hamcrest.Matchers.equalTo; + +public class DelayedDataDetectorIT extends MlNativeAutodetectIntegTestCase { + + private String index = "delayed-data"; + private long now = System.currentTimeMillis(); + private long numDocs; + + @Before + public void putDataintoIndex() { + client().admin().indices().prepareCreate(index) + .addMapping("type", "time", "type=date", "value", "type=long") + .get(); + numDocs = randomIntBetween(32, 128); + long oneDayAgo = now - 86400000; + writeData(logger, index, numDocs, oneDayAgo, now); + } + + @After + public void cleanUpTest() { + cleanUp(); + } + + public void testMissingDataDetection() throws Exception { + final String jobId = "delayed-data-detection-job"; + Job.Builder job = createJob(jobId, TimeValue.timeValueMinutes(5), "count", null); + + DatafeedConfig datafeedConfig = createDatafeed(job.getId() + "-datafeed", job.getId(), Collections.singletonList(index)); + registerJob(job); + putJob(job); + openJob(job.getId()); + + registerDatafeed(datafeedConfig); + putDatafeed(datafeedConfig); + startDatafeed(datafeedConfig.getId(), 0L, now); + waitUntilJobIsClosed(jobId); + + // Get the latest finalized bucket + Bucket lastBucket = getLatestFinalizedBucket(jobId); + + DelayedDataDetector delayedDataDetector = + new DelayedDataDetector(job.build(new Date()), datafeedConfig, TimeValue.timeValueHours(12), client()); + + List response = delayedDataDetector.detectMissingData(lastBucket.getEpoch()*1000); + assertThat(response.stream().mapToLong(BucketWithMissingData::getMissingDocumentCount).sum(), equalTo(0L)); + + long missingDocs = randomIntBetween(32, 128); + // Simply adding data within the current delayed data detection, the choice of 43100000 is arbitrary and within the window + // for the DelayedDataDetector + writeData(logger, index, missingDocs, now - 43100000, lastBucket.getEpoch()*1000); + + response = delayedDataDetector.detectMissingData(lastBucket.getEpoch()*1000); + assertThat(response.stream().mapToLong(BucketWithMissingData::getMissingDocumentCount).sum(), equalTo(missingDocs)); + } + + public void testMissingDataDetectionInSpecificBucket() throws Exception { + final String jobId = "delayed-data-detection-job-missing-test-specific-bucket"; + Job.Builder job = createJob(jobId, TimeValue.timeValueMinutes(5), "count", null); + + DatafeedConfig datafeedConfig = createDatafeed(job.getId() + "-datafeed", job.getId(), Collections.singletonList(index)); + registerJob(job); + putJob(job); + openJob(job.getId()); + + registerDatafeed(datafeedConfig); + putDatafeed(datafeedConfig); + + startDatafeed(datafeedConfig.getId(), 0L, now); + waitUntilJobIsClosed(jobId); + + // Get the latest finalized bucket + Bucket lastBucket = getLatestFinalizedBucket(jobId); + + DelayedDataDetector delayedDataDetector = + new DelayedDataDetector(job.build(new Date()), datafeedConfig, TimeValue.timeValueHours(12), client()); + + long missingDocs = randomIntBetween(1, 10); + + // Write our missing data in the bucket right before the last finalized bucket + writeData(logger, index, missingDocs, (lastBucket.getEpoch() - lastBucket.getBucketSpan())*1000, lastBucket.getEpoch()*1000); + List response = delayedDataDetector.detectMissingData(lastBucket.getEpoch()*1000); + + boolean hasBucketWithMissing = false; + for (BucketWithMissingData bucketWithMissingData : response) { + if (bucketWithMissingData.getBucket().getEpoch() == lastBucket.getEpoch() - lastBucket.getBucketSpan()) { + assertThat(bucketWithMissingData.getMissingDocumentCount(), equalTo(missingDocs)); + hasBucketWithMissing = true; + } + } + assertThat(hasBucketWithMissing, equalTo(true)); + } + + public void testMissingDataDetectionWithAggregationsAndQuery() throws Exception { + TimeValue bucketSpan = TimeValue.timeValueMinutes(10); + final String jobId = "delayed-data-detection-job-aggs-no-missing-test"; + Job.Builder job = createJob(jobId, bucketSpan, "mean", "value", "doc_count"); + + MaxAggregationBuilder maxTime = AggregationBuilders.max("time").field("time"); + AvgAggregationBuilder avgAggregationBuilder = AggregationBuilders.avg("value").field("value"); + DatafeedConfig.Builder datafeedConfigBuilder = createDatafeedBuilder(job.getId() + "-datafeed", + job.getId(), + Collections.singletonList(index)); + datafeedConfigBuilder.setAggregations(new AggregatorFactories.Builder().addAggregator( + AggregationBuilders.histogram("time") + .subAggregation(maxTime) + .subAggregation(avgAggregationBuilder) + .field("time") + .interval(TimeValue.timeValueMinutes(5).millis()))); + datafeedConfigBuilder.setQuery(new RangeQueryBuilder("value").gte(numDocs/2)); + datafeedConfigBuilder.setFrequency(TimeValue.timeValueMinutes(5)); + DatafeedConfig datafeedConfig = datafeedConfigBuilder.build(); + registerJob(job); + putJob(job); + openJob(job.getId()); + + registerDatafeed(datafeedConfig); + putDatafeed(datafeedConfig); + startDatafeed(datafeedConfig.getId(), 0L, now); + waitUntilJobIsClosed(jobId); + + // Get the latest finalized bucket + Bucket lastBucket = getLatestFinalizedBucket(jobId); + + DelayedDataDetector delayedDataDetector = + new DelayedDataDetector(job.build(new Date()), datafeedConfig, TimeValue.timeValueHours(12), client()); + + List response = delayedDataDetector.detectMissingData(lastBucket.getEpoch()*1000); + assertThat(response.stream().mapToLong(BucketWithMissingData::getMissingDocumentCount).sum(), equalTo(0L)); + + long missingDocs = numDocs; + // Simply adding data within the current delayed data detection, the choice of 43100000 is arbitrary and within the window + // for the DelayedDataDetector + writeData(logger, index, missingDocs, now - 43100000, lastBucket.getEpoch()*1000); + + response = delayedDataDetector.detectMissingData(lastBucket.getEpoch()*1000); + assertThat(response.stream().mapToLong(BucketWithMissingData::getMissingDocumentCount).sum(), equalTo((missingDocs+1)/2)); + } + + private Job.Builder createJob(String id, TimeValue bucketSpan, String function, String field) { + return createJob(id, bucketSpan, function, field, null); + } + + private Job.Builder createJob(String id, TimeValue bucketSpan, String function, String field, String summaryCountField) { + DataDescription.Builder dataDescription = new DataDescription.Builder(); + dataDescription.setFormat(DataDescription.DataFormat.XCONTENT); + dataDescription.setTimeField("time"); + dataDescription.setTimeFormat(DataDescription.EPOCH_MS); + + Detector.Builder d = new Detector.Builder(function, field); + AnalysisConfig.Builder analysisConfig = new AnalysisConfig.Builder(Collections.singletonList(d.build())); + analysisConfig.setBucketSpan(bucketSpan); + analysisConfig.setSummaryCountFieldName(summaryCountField); + + Job.Builder builder = new Job.Builder(); + builder.setId(id); + builder.setAnalysisConfig(analysisConfig); + builder.setDataDescription(dataDescription); + return builder; + } + + private void writeData(Logger logger, String index, long numDocs, long start, long end) { + int maxDelta = (int) (end - start - 1); + BulkRequestBuilder bulkRequestBuilder = client().prepareBulk(); + for (int i = 0; i < numDocs; i++) { + IndexRequest indexRequest = new IndexRequest(index, "type"); + long timestamp = start + randomIntBetween(0, maxDelta); + assert timestamp >= start && timestamp < end; + indexRequest.source("time", timestamp, "value", i); + bulkRequestBuilder.add(indexRequest); + } + BulkResponse bulkResponse = bulkRequestBuilder + .setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE) + .get(); + if (bulkResponse.hasFailures()) { + int failures = 0; + for (BulkItemResponse itemResponse : bulkResponse) { + if (itemResponse.isFailed()) { + failures++; + logger.error("Item response failure [{}]", itemResponse.getFailureMessage()); + } + } + fail("Bulk response contained " + failures + " failures"); + } + logger.info("Indexed [{}] documents", numDocs); + } + + private Bucket getLatestFinalizedBucket(String jobId) { + GetBucketsAction.Request getBucketsRequest = new GetBucketsAction.Request(jobId); + getBucketsRequest.setExcludeInterim(true); + getBucketsRequest.setSort(Result.TIMESTAMP.getPreferredName()); + getBucketsRequest.setDescending(true); + getBucketsRequest.setPageParams(new PageParams(0, 1)); + return getBuckets(getBucketsRequest).get(0); + } +} diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/DelayedDataDetector.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/DelayedDataDetector.java new file mode 100644 index 0000000000000..3c7c6ff963e07 --- /dev/null +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/DelayedDataDetector.java @@ -0,0 +1,158 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.ml.datafeed; + +import org.elasticsearch.action.search.SearchAction; +import org.elasticsearch.action.search.SearchRequest; +import org.elasticsearch.action.search.SearchResponse; +import org.elasticsearch.client.Client; +import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.common.util.concurrent.ThreadContext; +import org.elasticsearch.search.aggregations.bucket.histogram.DateHistogramAggregationBuilder; +import org.elasticsearch.search.aggregations.bucket.histogram.Histogram; +import org.elasticsearch.search.builder.SearchSourceBuilder; +import org.elasticsearch.xpack.core.ml.action.GetBucketsAction; +import org.elasticsearch.xpack.core.ml.action.util.PageParams; +import org.elasticsearch.xpack.core.ml.datafeed.DatafeedConfig; +import org.elasticsearch.xpack.core.ml.datafeed.extractor.ExtractorUtils; +import org.elasticsearch.xpack.core.ml.job.config.Job; +import org.elasticsearch.xpack.core.ml.job.results.Bucket; +import org.elasticsearch.xpack.core.ml.utils.Intervals; +import org.joda.time.DateTime; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; + +import static org.elasticsearch.xpack.core.ClientHelper.ML_ORIGIN; +import static org.elasticsearch.xpack.core.ClientHelper.stashWithOrigin; + + +/** + * This class will search the buckets and indices over a given window to determine if any data is missing + */ +public class DelayedDataDetector { + + private static final String DATE_BUCKETS = "date_buckets"; + private final long bucketSpan; + private final long window; + private final DatafeedConfig datafeedConfig; + private final Client client; + private final Job job; + + public DelayedDataDetector(Job job, DatafeedConfig datafeedConfig, TimeValue window, Client client) { + this.job = job; + this.bucketSpan = job.getAnalysisConfig().getBucketSpan().millis(); + this.datafeedConfig = datafeedConfig; + long windowMillis = window.millis(); + if (windowMillis < bucketSpan) { + throw new IllegalArgumentException("[window] must be greater or equal to the [bucket_span]"); + } + if (Intervals.alignToFloor(windowMillis/bucketSpan, bucketSpan) >= 10000) { + throw new IllegalArgumentException("[window] must contain less than 10000 buckets at the current [bucket_span]"); + } + this.window = windowMillis; + this.client = client; + } + + /** + * This method looks at the {@link DatafeedConfig} from {@code latestFinalizedBucket - window} to {@code latestFinalizedBucket}. + * + * It is done synchronously, and can block for a considerable amount of time, it should only be executed within the appropriate + * thread pool. + * + * @param latestFinalizedBucketMs The latest finalized bucket timestamp in milliseconds, signifies the end of the time window check + * @return A List of {@link BucketWithMissingData} objects that contain each bucket with the current number of missing docs + */ + public List detectMissingData(long latestFinalizedBucketMs) { + final long end = Intervals.alignToFloor(latestFinalizedBucketMs, bucketSpan); + final long start = Intervals.alignToFloor(latestFinalizedBucketMs - window, bucketSpan); + List finalizedBuckets = checkBucketEvents(start, end); + Map indexedData = checkCurrentBucketEventCount(start, end); + return finalizedBuckets.stream() + // We only care about the situation when data is added to the indices + // Older data could have been removed from the indices, and should not be considered "missing data" + .filter(bucket -> calculateMissing(indexedData, bucket) > 0) + .map(bucket -> BucketWithMissingData.fromMissingAndBucket(calculateMissing(indexedData, bucket), bucket)) + .collect(Collectors.toList()); + } + + private List checkBucketEvents(long start, long end) { + GetBucketsAction.Request request = new GetBucketsAction.Request(job.getId()); + request.setStart(Long.toString(start)); + request.setEnd(Long.toString(end)); + request.setExcludeInterim(true); + request.setPageParams(new PageParams(0, (int)((end - start)/bucketSpan))); + + try (ThreadContext.StoredContext ignore = stashWithOrigin(client.threadPool().getThreadContext(), ML_ORIGIN)) { + GetBucketsAction.Response response = client.execute(GetBucketsAction.INSTANCE, request).actionGet(); + return response.getBuckets().results(); + } + } + + private Map checkCurrentBucketEventCount(long start, long end) { + String timeField = job.getDataDescription().getTimeField(); + SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder() + .size(0) + .aggregation(new DateHistogramAggregationBuilder(DATE_BUCKETS).interval(bucketSpan).field(timeField)) + .query(ExtractorUtils.wrapInTimeRangeQuery(datafeedConfig.getQuery(), timeField, start, end)); + + SearchRequest searchRequest = new SearchRequest(datafeedConfig.getIndices().toArray(new String[0])).source(searchSourceBuilder); + try (ThreadContext.StoredContext ignore = stashWithOrigin(client.threadPool().getThreadContext(), ML_ORIGIN)) { + SearchResponse response = client.execute(SearchAction.INSTANCE, searchRequest).actionGet(); + List buckets = ((Histogram)response.getAggregations().get(DATE_BUCKETS)).getBuckets(); + Map hashMap = new HashMap<>(buckets.size()); + for (Histogram.Bucket bucket : buckets) { + long bucketTime = toHistogramKeyToEpoch(bucket.getKey()); + if (bucketTime < 0) { + throw new IllegalStateException("Histogram key [" + bucket.getKey() + "] cannot be converted to a timestamp"); + } + hashMap.put(bucketTime, bucket.getDocCount()); + } + return hashMap; + } + } + + private static long toHistogramKeyToEpoch(Object key) { + if (key instanceof DateTime) { + return ((DateTime)key).getMillis(); + } else if (key instanceof Double) { + return ((Double)key).longValue(); + } else if (key instanceof Long){ + return (Long)key; + } else { + return -1L; + } + } + + private static long calculateMissing(Map indexedData, Bucket bucket) { + return indexedData.getOrDefault(bucket.getEpoch() * 1000, 0L) - bucket.getEventCount(); + } + + public static class BucketWithMissingData { + + private final long missingDocumentCount; + private final Bucket bucket; + + static BucketWithMissingData fromMissingAndBucket(long missingDocumentCount, Bucket bucket) { + return new BucketWithMissingData(missingDocumentCount, bucket); + } + + private BucketWithMissingData(long missingDocumentCount, Bucket bucket) { + this.missingDocumentCount = missingDocumentCount; + this.bucket = bucket; + } + + public Bucket getBucket() { + return bucket; + } + + public long getMissingDocumentCount() { + return missingDocumentCount; + } + } +} diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/datafeed/DelayedDataDetectorTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/datafeed/DelayedDataDetectorTests.java new file mode 100644 index 0000000000000..9a54181af9ce6 --- /dev/null +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/datafeed/DelayedDataDetectorTests.java @@ -0,0 +1,76 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.ml.datafeed; + +import org.elasticsearch.client.Client; +import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.xpack.core.ml.datafeed.DatafeedConfig; +import org.elasticsearch.xpack.core.ml.job.config.AnalysisConfig; +import org.elasticsearch.xpack.core.ml.job.config.DataDescription; +import org.elasticsearch.xpack.core.ml.job.config.Detector; +import org.elasticsearch.xpack.core.ml.job.config.Job; + +import java.util.Collections; +import java.util.Date; + +import static org.hamcrest.Matchers.equalTo; +import static org.mockito.Mockito.mock; + + +public class DelayedDataDetectorTests extends ESTestCase { + + + public void testConstructorWithValueValues() { + TimeValue window = TimeValue.timeValueSeconds(10); + Job job = createJob(TimeValue.timeValueSeconds(1)); + DelayedDataDetector delayedDataDetector = new DelayedDataDetector(job, createDatafeed(), window, mock(Client.class)); + assertNotNull(delayedDataDetector); + } + + public void testConstructorWithInvalidValues() { + TimeValue shortWindow = TimeValue.timeValueMillis(500); + Job job = createJob(TimeValue.timeValueSeconds(1)); + + Exception exception = expectThrows(IllegalArgumentException.class, + ()-> new DelayedDataDetector(job, createDatafeed(), shortWindow, mock(Client.class))); + assertThat(exception.getMessage(), equalTo("[window] must be greater or equal to the [bucket_span]")); + + TimeValue longWindow = TimeValue.timeValueSeconds(20000); + + exception = expectThrows(IllegalArgumentException.class, + ()-> new DelayedDataDetector(job, createDatafeed(), longWindow, mock(Client.class))); + assertThat(exception.getMessage(), equalTo("[window] must contain less than 10000 buckets at the current [bucket_span]")); + } + + + private Job createJob(TimeValue bucketSpan) { + DataDescription.Builder dataDescription = new DataDescription.Builder(); + dataDescription.setFormat(DataDescription.DataFormat.XCONTENT); + dataDescription.setTimeField("time"); + dataDescription.setTimeFormat(DataDescription.EPOCH_MS); + + Detector.Builder d = new Detector.Builder("count", null); + AnalysisConfig.Builder analysisConfig = new AnalysisConfig.Builder(Collections.singletonList(d.build())); + analysisConfig.setBucketSpan(bucketSpan); + + Job.Builder builder = new Job.Builder(); + builder.setId("test-job"); + builder.setAnalysisConfig(analysisConfig); + builder.setDataDescription(dataDescription); + return builder.build(new Date()); + } + + private DatafeedConfig createDatafeed() { + DatafeedConfig.Builder builder = new DatafeedConfig.Builder("id", "jobId"); + builder.setIndices(Collections.singletonList("index1")); + builder.setTypes(Collections.singletonList("doc")); + return builder.build(); + } + + + +}