diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/DataCountsReporter.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/DataCountsReporter.java index debe1c36bbace..80223027e8ee0 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/DataCountsReporter.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/DataCountsReporter.java @@ -12,8 +12,9 @@ import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.xpack.core.ml.job.config.Job; -import org.elasticsearch.xpack.ml.job.persistence.JobDataCountsPersister; import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.DataCounts; +import org.elasticsearch.xpack.ml.job.persistence.JobDataCountsPersister; +import org.elasticsearch.xpack.ml.job.process.diagnostics.DataStreamDiagnostics; import java.util.Date; import java.util.Locale; diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/DataStreamDiagnostics.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/DataStreamDiagnostics.java deleted file mode 100644 index b733204258689..0000000000000 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/DataStreamDiagnostics.java +++ /dev/null @@ -1,223 +0,0 @@ -/* - * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one - * or more contributor license agreements. Licensed under the Elastic License; - * you may not use this file except in compliance with the Elastic License. - */ -package org.elasticsearch.xpack.ml.job.process; - -import org.apache.logging.log4j.Logger; -import org.apache.lucene.util.Counter; -import org.elasticsearch.common.logging.Loggers; -import org.elasticsearch.xpack.core.ml.job.config.Job; - -import java.util.Date; -import java.util.SortedMap; -import java.util.TreeMap; - -public class DataStreamDiagnostics { - - /** - * Minimum window to take into consideration for bucket count histogram. - */ - private static final int MIN_BUCKET_WINDOW = 10; - - /** - * Threshold to report potential sparsity problems. - * - * Sparsity score is calculated: log(average) - log(current) - * - * If score is above the threshold, bucket is reported as sparse bucket. - */ - private static final int DATA_SPARSITY_THRESHOLD = 2; - private static final long MS_IN_SECOND = 1000; - - private static final Logger LOGGER = Loggers.getLogger(DataStreamDiagnostics.class); - /** - * Container for the histogram - * - * Note: Using a sorted map in order to iterate in order when consuming the - * data. The counter is lazily initialized and potentially missing in case - * of empty buckets. - * - * The container gets pruned along the data streaming based on the bucket - * window, so it should not contain more than max(MIN_BUCKET_WINDOW, - * 'buckets_required_by_latency') + 1 items at any time. - * - * Sparsity can only be calculated after the window has been filled. Currently - * this window is lost if a job gets closed and re-opened. We might fix this - * in future. - */ - private final SortedMap movingBucketHistogram = new TreeMap<>(); - - private final long bucketSpan; - private final long latency; - private long movingBucketCount = 0; - private long latestReportedBucket = -1; - - private long bucketCount = 0; - private long emptyBucketCount = 0; - private long latestEmptyBucketTime = -1; - private long sparseBucketCount = 0; - private long latestSparseBucketTime = -1; - - public DataStreamDiagnostics(Job job) { - bucketSpan = job.getAnalysisConfig().getBucketSpan().seconds(); - latency = job.getAnalysisConfig().getLatency() == null ? 0 : job.getAnalysisConfig().getLatency().seconds(); - } - - /** - * Check record - * - * @param recordTimestampInMs - * The record timestamp in milliseconds since epoch - */ - - public void checkRecord(long recordTimestampInMs) { - checkBucketing(recordTimestampInMs); - } - - /** - * Flush all counters, should be called at the end of the data stream - */ - public void flush() { - // flush all we know - if (movingBucketHistogram.isEmpty() == false) { - flush(movingBucketHistogram.lastKey()); - } - } - - /** - * Check bucketing of record. Report empty and sparse buckets. - * - * @param recordTimestampInMs - * The record timestamp in milliseconds since epoch - */ - private void checkBucketing(long recordTimestampInMs) { - long bucket = (recordTimestampInMs / MS_IN_SECOND) / bucketSpan; - long bucketHistogramStartBucket = ((recordTimestampInMs / MS_IN_SECOND) - latency) / bucketSpan; - - bucketHistogramStartBucket = Math.min(bucket - MIN_BUCKET_WINDOW, bucketHistogramStartBucket); - - movingBucketHistogram.computeIfAbsent(bucket, l -> Counter.newCounter()).addAndGet(1); - ++movingBucketCount; - - // find the very first bucket - if (latestReportedBucket == -1) { - latestReportedBucket = bucket - 1; - } - - // flush all bucket out of the window - flush(bucketHistogramStartBucket); - } - - /** - * Flush Bucket reporting till the given bucket. - * - * @param bucketNumber - * The number of the last bucket that can be flushed. - */ - private void flush(long bucketNumber) { - - // check for a longer period of empty buckets - long emptyBuckets = movingBucketHistogram.firstKey() - latestReportedBucket - 1; - if (emptyBuckets > 0) { - bucketCount += emptyBuckets; - emptyBucketCount += emptyBuckets; - latestEmptyBucketTime = (movingBucketHistogram.firstKey() - 1) * bucketSpan * MS_IN_SECOND; - latestReportedBucket = movingBucketHistogram.firstKey() - 1; - } - - // calculate the average number of data points in a bucket based on the - // current history - double averageBucketSize = (float) movingBucketCount / movingBucketHistogram.size(); - - // prune all buckets that can be flushed - long lastBucketSparsityCheck = Math.min(bucketNumber, movingBucketHistogram.lastKey()); - - for (long pruneBucket = movingBucketHistogram.firstKey(); pruneBucket < lastBucketSparsityCheck; ++pruneBucket) { - - Counter bucketSizeHolder = movingBucketHistogram.remove(pruneBucket); - long bucketSize = bucketSizeHolder != null ? bucketSizeHolder.get() : 0L; - - LOGGER.debug("Checking bucket {} compare sizes, this bucket: {} average: {}", pruneBucket, bucketSize, averageBucketSize); - ++bucketCount; - latestReportedBucket = pruneBucket; - - // substract bucketSize from the counter - movingBucketCount -= bucketSize; - - // check if bucket is empty - if (bucketSize == 0L) { - latestEmptyBucketTime = pruneBucket * bucketSpan * MS_IN_SECOND; - ++emptyBucketCount; - - // do not do sparse analysis on an empty bucket - continue; - } - - // simplistic way to calculate data sparsity, just take the log and - // check the difference - double logAverageBucketSize = Math.log(averageBucketSize); - double logBucketSize = Math.log(bucketSize); - double sparsityScore = logAverageBucketSize - logBucketSize; - - if (sparsityScore > DATA_SPARSITY_THRESHOLD) { - LOGGER.debug("Sparse bucket {}, this bucket: {} average: {}, sparsity score: {}", pruneBucket, bucketSize, - averageBucketSize, sparsityScore); - ++sparseBucketCount; - latestSparseBucketTime = pruneBucket * bucketSpan * MS_IN_SECOND; - } - } - - // prune the rest if necessary - for (long pruneBucket = lastBucketSparsityCheck; pruneBucket < bucketNumber; ++pruneBucket) { - Counter bucketSizeHolder = movingBucketHistogram.remove(pruneBucket); - long bucketSize = bucketSizeHolder != null ? bucketSizeHolder.get() : 0L; - - bucketCount++; - latestReportedBucket = pruneBucket; - - // substract bucketSize from the counter - movingBucketCount -= bucketSize; - - // check if bucket is empty - if (bucketSize == 0L) { - latestEmptyBucketTime = pruneBucket * bucketSpan * MS_IN_SECOND; - ++emptyBucketCount; - } - } - } - - public long getBucketCount() { - return bucketCount; - } - - public long getEmptyBucketCount() { - return emptyBucketCount; - } - - public Date getLatestEmptyBucketTime() { - return latestEmptyBucketTime > 0 ? new Date(latestEmptyBucketTime) : null; - } - - public long getSparseBucketCount() { - return sparseBucketCount; - } - - public Date getLatestSparseBucketTime() { - return latestSparseBucketTime > 0 ? new Date(latestSparseBucketTime) : null; - } - - /** - * Resets counts, - * - * Note: This does not reset the inner state for e.g. sparse bucket - * detection. - * - */ - public void resetCounts() { - bucketCount = 0; - emptyBucketCount = 0; - sparseBucketCount = 0; - } -} diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/diagnostics/BucketDiagnostics.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/diagnostics/BucketDiagnostics.java new file mode 100644 index 0000000000000..c61926dfb0426 --- /dev/null +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/diagnostics/BucketDiagnostics.java @@ -0,0 +1,132 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.ml.job.process.diagnostics; + +import org.elasticsearch.xpack.core.ml.job.config.Job; +import org.elasticsearch.xpack.core.ml.utils.Intervals; + +/** + * A moving window of buckets that allow keeping + * track of some statistics like the bucket count, + * empty or sparse buckets, etc. + * + * The counts are stored in an array that functions as a + * circular buffer. When time is advanced, all buckets + * out of the window are flushed. + */ +class BucketDiagnostics { + + private static final int MIN_BUCKETS = 10; + + private final long bucketSpanMs; + private final long latencyMs; + private final int maxSize; + private final long[] buckets; + private long movingBucketCount = 0; + private long latestBucketStartMs = -1; + private int latestBucketIndex; + private long earliestBucketStartMs = -1; + private int earliestBucketIndex; + private long latestFlushedBucketStartMs = -1; + private final BucketFlushListener bucketFlushListener; + + BucketDiagnostics(Job job, BucketFlushListener bucketFlushListener) { + bucketSpanMs = job.getAnalysisConfig().getBucketSpan().millis(); + latencyMs = job.getAnalysisConfig().getLatency() == null ? 0 : job.getAnalysisConfig().getLatency().millis(); + maxSize = Math.max((int) (Intervals.alignToCeil(latencyMs, bucketSpanMs) / bucketSpanMs), MIN_BUCKETS); + buckets = new long[maxSize]; + this.bucketFlushListener = bucketFlushListener; + } + + void addRecord(long recordTimestampMs) { + long bucketStartMs = Intervals.alignToFloor(recordTimestampMs, bucketSpanMs); + + // Initialize earliest/latest times + if (latestBucketStartMs < 0) { + latestBucketStartMs = bucketStartMs; + earliestBucketStartMs = bucketStartMs; + } + + advanceTime(bucketStartMs); + addToBucket(bucketStartMs); + } + + private void advanceTime(long bucketStartMs) { + while (bucketStartMs > latestBucketStartMs) { + int flushBucketIndex = (latestBucketIndex + 1) % maxSize; + + if (flushBucketIndex == earliestBucketIndex) { + flush(flushBucketIndex); + movingBucketCount -= buckets[flushBucketIndex]; + earliestBucketStartMs += bucketSpanMs; + earliestBucketIndex = (earliestBucketIndex + 1) % maxSize; + } + buckets[flushBucketIndex] = 0L; + + latestBucketStartMs += bucketSpanMs; + latestBucketIndex = flushBucketIndex; + } + } + + private void addToBucket(long bucketStartMs) { + int offsetToLatest = (int) ((bucketStartMs - latestBucketStartMs) / bucketSpanMs); + int bucketIndex = (latestBucketIndex + offsetToLatest) % maxSize; + if (bucketIndex < 0) { + bucketIndex = maxSize + bucketIndex; + } + + ++buckets[bucketIndex]; + ++movingBucketCount; + + if (bucketStartMs < earliestBucketStartMs) { + earliestBucketStartMs = bucketStartMs; + earliestBucketIndex = bucketIndex; + } + } + + private void flush(int bucketIndex) { + long bucketStartMs = getTimestampMs(bucketIndex); + if (bucketStartMs > latestFlushedBucketStartMs) { + bucketFlushListener.onBucketFlush(bucketStartMs, buckets[bucketIndex]); + latestFlushedBucketStartMs = bucketStartMs; + } + } + + private long getTimestampMs(int bucketIndex) { + int offsetToLatest = latestBucketIndex - bucketIndex; + if (offsetToLatest < 0) { + offsetToLatest = maxSize + offsetToLatest; + } + return latestBucketStartMs - offsetToLatest * bucketSpanMs; + } + + void flush() { + if (latestBucketStartMs < 0) { + return; + } + + int bucketIndex = earliestBucketIndex; + while (bucketIndex != latestBucketIndex) { + flush(bucketIndex); + bucketIndex = (bucketIndex + 1) % maxSize; + } + } + + double averageBucketCount() { + return (double) movingBucketCount / size(); + } + + private int size() { + if (latestBucketStartMs < 0) { + return 0; + } + return (int) ((latestBucketStartMs - earliestBucketStartMs) / bucketSpanMs) + 1; + } + + interface BucketFlushListener { + void onBucketFlush(long bucketStartMs, long bucketCounts); + } +} diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/diagnostics/DataStreamDiagnostics.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/diagnostics/DataStreamDiagnostics.java new file mode 100644 index 0000000000000..a19f6eba02367 --- /dev/null +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/diagnostics/DataStreamDiagnostics.java @@ -0,0 +1,113 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.ml.job.process.diagnostics; + +import org.apache.logging.log4j.Logger; +import org.elasticsearch.common.logging.Loggers; +import org.elasticsearch.xpack.core.ml.job.config.Job; + +import java.util.Date; + +public class DataStreamDiagnostics { + + /** + * Threshold to report potential sparsity problems. + * + * Sparsity score is calculated: log(average) - log(current) + * + * If score is above the threshold, bucket is reported as sparse bucket. + */ + private static final int DATA_SPARSITY_THRESHOLD = 2; + + private static final Logger LOGGER = Loggers.getLogger(DataStreamDiagnostics.class); + + private final BucketDiagnostics bucketDiagnostics; + + private long bucketCount = 0; + private long emptyBucketCount = 0; + private long latestEmptyBucketTime = -1; + private long sparseBucketCount = 0; + private long latestSparseBucketTime = -1; + + public DataStreamDiagnostics(Job job) { + bucketDiagnostics = new BucketDiagnostics(job, createBucketFlushListener()); + } + + private BucketDiagnostics.BucketFlushListener createBucketFlushListener() { + return (flushedBucketStartMs, flushedBucketCount) -> { + ++bucketCount; + if (flushedBucketCount == 0) { + ++emptyBucketCount; + latestEmptyBucketTime = flushedBucketStartMs; + } else { + // simplistic way to calculate data sparsity, just take the log and + // check the difference + double averageBucketSize = bucketDiagnostics.averageBucketCount(); + double logAverageBucketSize = Math.log(averageBucketSize); + double logBucketSize = Math.log(flushedBucketCount); + double sparsityScore = logAverageBucketSize - logBucketSize; + + if (sparsityScore > DATA_SPARSITY_THRESHOLD) { + LOGGER.debug("Sparse bucket {}, this bucket: {} average: {}, sparsity score: {}", flushedBucketStartMs, + flushedBucketCount, averageBucketSize, sparsityScore); + ++sparseBucketCount; + latestSparseBucketTime = flushedBucketStartMs; + } + } + }; + } + + /** + * Check record + * + * @param recordTimestampInMs + * The record timestamp in milliseconds since epoch + */ + public void checkRecord(long recordTimestampInMs) { + bucketDiagnostics.addRecord(recordTimestampInMs); + } + + /** + * Flush all counters, should be called at the end of the data stream + */ + public void flush() { + // flush all we know + bucketDiagnostics.flush(); + } + + public long getBucketCount() { + return bucketCount; + } + + public long getEmptyBucketCount() { + return emptyBucketCount; + } + + public Date getLatestEmptyBucketTime() { + return latestEmptyBucketTime > 0 ? new Date(latestEmptyBucketTime) : null; + } + + public long getSparseBucketCount() { + return sparseBucketCount; + } + + public Date getLatestSparseBucketTime() { + return latestSparseBucketTime > 0 ? new Date(latestSparseBucketTime) : null; + } + + /** + * Resets counts, + * + * Note: This does not reset the inner state for e.g. sparse bucket + * detection. + * + */ + public void resetCounts() { + bucketCount = 0; + emptyBucketCount = 0; + sparseBucketCount = 0; + } +} diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/DataStreamDiagnosticsTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/diagnostics/DataStreamDiagnosticsTests.java similarity index 81% rename from x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/DataStreamDiagnosticsTests.java rename to x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/diagnostics/DataStreamDiagnosticsTests.java index 2c167f0df82cf..19f7f88c38fef 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/DataStreamDiagnosticsTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/diagnostics/DataStreamDiagnosticsTests.java @@ -3,7 +3,7 @@ * or more contributor license agreements. Licensed under the Elastic License; * you may not use this file except in compliance with the Elastic License. */ -package org.elasticsearch.xpack.ml.job.process; +package org.elasticsearch.xpack.ml.job.process.diagnostics; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.test.ESTestCase; @@ -13,7 +13,6 @@ import org.elasticsearch.xpack.core.ml.job.config.Job; import org.junit.Before; -import java.io.IOException; import java.util.Arrays; import java.util.Date; @@ -21,9 +20,9 @@ public class DataStreamDiagnosticsTests extends ESTestCase { private static final long BUCKET_SPAN = 60000; private Job job; - + @Before - public void setUpMocks() throws IOException { + public void setUpMocks() { AnalysisConfig.Builder acBuilder = new AnalysisConfig.Builder(Arrays.asList(new Detector.Builder("metric", "field").build())); acBuilder.setBucketSpan(TimeValue.timeValueMillis(BUCKET_SPAN)); acBuilder.setLatency(TimeValue.ZERO); @@ -32,7 +31,7 @@ public void setUpMocks() throws IOException { Job.Builder builder = new Job.Builder("job_id"); builder.setAnalysisConfig(acBuilder); builder.setDataDescription(new DataDescription.Builder()); - job = builder.build(new Date()); + job = createJob(TimeValue.timeValueMillis(BUCKET_SPAN), null); } public void testIncompleteBuckets() { @@ -80,6 +79,7 @@ public void testIncompleteBuckets() { assertEquals(null, d.getLatestSparseBucketTime()); assertEquals(new Date(BUCKET_SPAN * 2), d.getLatestEmptyBucketTime()); } + public void testSimple() { DataStreamDiagnostics d = new DataStreamDiagnostics(job); @@ -102,6 +102,58 @@ public void testSimple() { assertEquals(null, d.getLatestEmptyBucketTime()); } + public void testSimpleReverse() { + DataStreamDiagnostics d = new DataStreamDiagnostics(job); + + d.checkRecord(610000); + d.checkRecord(550000); + d.checkRecord(490000); + d.checkRecord(430000); + d.checkRecord(370000); + d.checkRecord(310000); + d.checkRecord(250000); + d.checkRecord(190000); + d.checkRecord(130000); + d.checkRecord(70000); + + d.flush(); + assertEquals(9, d.getBucketCount()); + assertEquals(0, d.getEmptyBucketCount()); + assertEquals(0, d.getSparseBucketCount()); + assertEquals(null, d.getLatestSparseBucketTime()); + assertEquals(null, d.getLatestEmptyBucketTime()); + } + + public void testWithLatencyLessThanTenBuckets() { + job = createJob(TimeValue.timeValueMillis(BUCKET_SPAN), TimeValue.timeValueMillis(3 * BUCKET_SPAN)); + DataStreamDiagnostics d = new DataStreamDiagnostics(job); + + long timestamp = 70000; + while (timestamp < 70000 + 20 * BUCKET_SPAN) { + sendManyDataPoints(d, timestamp - BUCKET_SPAN, timestamp + timestamp, 100); + timestamp += BUCKET_SPAN; + } + + assertEquals(10, d.getBucketCount()); + d.flush(); + assertEquals(19, d.getBucketCount()); + } + + public void testWithLatencyGreaterThanTenBuckets() { + job = createJob(TimeValue.timeValueMillis(BUCKET_SPAN), TimeValue.timeValueMillis(13 * BUCKET_SPAN + 10000)); + DataStreamDiagnostics d = new DataStreamDiagnostics(job); + + long timestamp = 70000; + while (timestamp < 70000 + 20 * BUCKET_SPAN) { + sendManyDataPoints(d, timestamp - BUCKET_SPAN, timestamp + timestamp, 100); + timestamp += BUCKET_SPAN; + } + + assertEquals(6, d.getBucketCount()); + d.flush(); + assertEquals(19, d.getBucketCount()); + } + public void testEmptyBuckets() { DataStreamDiagnostics d = new DataStreamDiagnostics(job); @@ -280,7 +332,7 @@ public void testEmptyBucketsLongerOutage() { /** * Send signals, make a longer period of sparse signals, then go up again - * + * * The number of sparse buckets should not be to much, it could be normal. */ public void testSparseBucketsLongerPeriod() { @@ -307,6 +359,20 @@ public void testSparseBucketsLongerPeriod() { assertEquals(null, d.getLatestEmptyBucketTime()); } + private static Job createJob(TimeValue bucketSpan, TimeValue latency) { + AnalysisConfig.Builder acBuilder = new AnalysisConfig.Builder(Arrays.asList(new Detector.Builder("metric", "field").build())); + acBuilder.setBucketSpan(bucketSpan); + if (latency != null) { + acBuilder.setLatency(latency); + } + acBuilder.setDetectors(Arrays.asList(new Detector.Builder("metric", "field").build())); + + Job.Builder builder = new Job.Builder("job_id"); + builder.setAnalysisConfig(acBuilder); + builder.setDataDescription(new DataDescription.Builder()); + return builder.build(new Date()); + } + public void testFlushAfterZeroRecords() { DataStreamDiagnostics d = new DataStreamDiagnostics(job); d.flush();