diff --git a/client/rest-high-level/src/main/java/org/elasticsearch/client/ml/datafeed/DatafeedTimingStats.java b/client/rest-high-level/src/main/java/org/elasticsearch/client/ml/datafeed/DatafeedTimingStats.java index 17617f556cf7b..9f9215e5046fe 100644 --- a/client/rest-high-level/src/main/java/org/elasticsearch/client/ml/datafeed/DatafeedTimingStats.java +++ b/client/rest-high-level/src/main/java/org/elasticsearch/client/ml/datafeed/DatafeedTimingStats.java @@ -36,7 +36,9 @@ public class DatafeedTimingStats implements ToXContentObject { public static final ParseField JOB_ID = new ParseField("job_id"); public static final ParseField SEARCH_COUNT = new ParseField("search_count"); + public static final ParseField BUCKET_COUNT = new ParseField("bucket_count"); public static final ParseField TOTAL_SEARCH_TIME_MS = new ParseField("total_search_time_ms"); + public static final ParseField AVG_SEARCH_TIME_PER_BUCKET_MS = new ParseField("average_search_time_per_bucket_ms"); public static final ParseField TYPE = new ParseField("datafeed_timing_stats"); @@ -50,23 +52,37 @@ private static ConstructingObjectParser createParser( args -> { String jobId = (String) args[0]; Long searchCount = (Long) args[1]; - Double totalSearchTimeMs = (Double) args[2]; - return new DatafeedTimingStats(jobId, getOrDefault(searchCount, 0L), getOrDefault(totalSearchTimeMs, 0.0)); + Long bucketCount = (Long) args[2]; + Double totalSearchTimeMs = (Double) args[3]; + Double avgSearchTimePerBucketMs = (Double) args[4]; + return new DatafeedTimingStats( + jobId, + getOrDefault(searchCount, 0L), + getOrDefault(bucketCount, 0L), + getOrDefault(totalSearchTimeMs, 0.0), + avgSearchTimePerBucketMs); }); parser.declareString(constructorArg(), JOB_ID); parser.declareLong(optionalConstructorArg(), SEARCH_COUNT); + parser.declareLong(optionalConstructorArg(), BUCKET_COUNT); parser.declareDouble(optionalConstructorArg(), TOTAL_SEARCH_TIME_MS); + parser.declareDouble(optionalConstructorArg(), AVG_SEARCH_TIME_PER_BUCKET_MS); return parser; } private final String jobId; private long searchCount; + private long bucketCount; private double totalSearchTimeMs; + private Double avgSearchTimePerBucketMs; - public DatafeedTimingStats(String jobId, long searchCount, double totalSearchTimeMs) { + public DatafeedTimingStats( + String jobId, long searchCount, long bucketCount, double totalSearchTimeMs, @Nullable Double avgSearchTimePerBucketMs) { this.jobId = Objects.requireNonNull(jobId); this.searchCount = searchCount; + this.bucketCount = bucketCount; this.totalSearchTimeMs = totalSearchTimeMs; + this.avgSearchTimePerBucketMs = avgSearchTimePerBucketMs; } public String getJobId() { @@ -77,16 +93,28 @@ public long getSearchCount() { return searchCount; } + public long getBucketCount() { + return bucketCount; + } + public double getTotalSearchTimeMs() { return totalSearchTimeMs; } + public Double getAvgSearchTimePerBucketMs() { + return avgSearchTimePerBucketMs; + } + @Override public XContentBuilder toXContent(XContentBuilder builder, ToXContent.Params params) throws IOException { builder.startObject(); builder.field(JOB_ID.getPreferredName(), jobId); builder.field(SEARCH_COUNT.getPreferredName(), searchCount); + builder.field(BUCKET_COUNT.getPreferredName(), bucketCount); builder.field(TOTAL_SEARCH_TIME_MS.getPreferredName(), totalSearchTimeMs); + if (avgSearchTimePerBucketMs != null) { + builder.field(AVG_SEARCH_TIME_PER_BUCKET_MS.getPreferredName(), avgSearchTimePerBucketMs); + } builder.endObject(); return builder; } @@ -103,12 +131,14 @@ public boolean equals(Object obj) { DatafeedTimingStats other = (DatafeedTimingStats) obj; return Objects.equals(this.jobId, other.jobId) && this.searchCount == other.searchCount - && this.totalSearchTimeMs == other.totalSearchTimeMs; + && this.bucketCount == other.bucketCount + && this.totalSearchTimeMs == other.totalSearchTimeMs + && Objects.equals(this.avgSearchTimePerBucketMs, other.avgSearchTimePerBucketMs); } @Override public int hashCode() { - return Objects.hash(jobId, searchCount, totalSearchTimeMs); + return Objects.hash(jobId, searchCount, bucketCount, totalSearchTimeMs, avgSearchTimePerBucketMs); } @Override diff --git a/client/rest-high-level/src/main/java/org/elasticsearch/client/ml/job/process/TimingStats.java b/client/rest-high-level/src/main/java/org/elasticsearch/client/ml/job/process/TimingStats.java index 73393140f30af..9493270c4b936 100644 --- a/client/rest-high-level/src/main/java/org/elasticsearch/client/ml/job/process/TimingStats.java +++ b/client/rest-high-level/src/main/java/org/elasticsearch/client/ml/job/process/TimingStats.java @@ -39,6 +39,7 @@ public class TimingStats implements ToXContentObject { public static final ParseField BUCKET_COUNT = new ParseField("bucket_count"); + public static final ParseField TOTAL_BUCKET_PROCESSING_TIME_MS = new ParseField("total_bucket_processing_time_ms"); public static final ParseField MIN_BUCKET_PROCESSING_TIME_MS = new ParseField("minimum_bucket_processing_time_ms"); public static final ParseField MAX_BUCKET_PROCESSING_TIME_MS = new ParseField("maximum_bucket_processing_time_ms"); public static final ParseField AVG_BUCKET_PROCESSING_TIME_MS = new ParseField("average_bucket_processing_time_ms"); @@ -49,12 +50,28 @@ public class TimingStats implements ToXContentObject { new ConstructingObjectParser<>( "timing_stats", true, - args -> - new TimingStats((String) args[0], (long) args[1], (Double) args[2], (Double) args[3], (Double) args[4], (Double) args[5])); + args -> { + String jobId = (String) args[0]; + Long bucketCount = (Long) args[1]; + Double totalBucketProcessingTimeMs = (Double) args[2]; + Double minBucketProcessingTimeMs = (Double) args[3]; + Double maxBucketProcessingTimeMs = (Double) args[4]; + Double avgBucketProcessingTimeMs = (Double) args[5]; + Double exponentialAvgBucketProcessingTimeMs = (Double) args[6]; + return new TimingStats( + jobId, + getOrDefault(bucketCount, 0L), + getOrDefault(totalBucketProcessingTimeMs, 0.0), + minBucketProcessingTimeMs, + maxBucketProcessingTimeMs, + avgBucketProcessingTimeMs, + exponentialAvgBucketProcessingTimeMs); + }); static { PARSER.declareString(constructorArg(), Job.ID); - PARSER.declareLong(constructorArg(), BUCKET_COUNT); + PARSER.declareLong(optionalConstructorArg(), BUCKET_COUNT); + PARSER.declareDouble(optionalConstructorArg(), TOTAL_BUCKET_PROCESSING_TIME_MS); PARSER.declareDouble(optionalConstructorArg(), MIN_BUCKET_PROCESSING_TIME_MS); PARSER.declareDouble(optionalConstructorArg(), MAX_BUCKET_PROCESSING_TIME_MS); PARSER.declareDouble(optionalConstructorArg(), AVG_BUCKET_PROCESSING_TIME_MS); @@ -63,6 +80,7 @@ public class TimingStats implements ToXContentObject { private final String jobId; private long bucketCount; + private double totalBucketProcessingTimeMs; private Double minBucketProcessingTimeMs; private Double maxBucketProcessingTimeMs; private Double avgBucketProcessingTimeMs; @@ -71,12 +89,14 @@ public class TimingStats implements ToXContentObject { public TimingStats( String jobId, long bucketCount, + double totalBucketProcessingTimeMs, @Nullable Double minBucketProcessingTimeMs, @Nullable Double maxBucketProcessingTimeMs, @Nullable Double avgBucketProcessingTimeMs, @Nullable Double exponentialAvgBucketProcessingTimeMs) { this.jobId = jobId; this.bucketCount = bucketCount; + this.totalBucketProcessingTimeMs = totalBucketProcessingTimeMs; this.minBucketProcessingTimeMs = minBucketProcessingTimeMs; this.maxBucketProcessingTimeMs = maxBucketProcessingTimeMs; this.avgBucketProcessingTimeMs = avgBucketProcessingTimeMs; @@ -91,6 +111,10 @@ public long getBucketCount() { return bucketCount; } + public double getTotalBucketProcessingTimeMs() { + return totalBucketProcessingTimeMs; + } + public Double getMinBucketProcessingTimeMs() { return minBucketProcessingTimeMs; } @@ -112,6 +136,7 @@ public XContentBuilder toXContent(XContentBuilder builder, ToXContent.Params par builder.startObject(); builder.field(Job.ID.getPreferredName(), jobId); builder.field(BUCKET_COUNT.getPreferredName(), bucketCount); + builder.field(TOTAL_BUCKET_PROCESSING_TIME_MS.getPreferredName(), totalBucketProcessingTimeMs); if (minBucketProcessingTimeMs != null) { builder.field(MIN_BUCKET_PROCESSING_TIME_MS.getPreferredName(), minBucketProcessingTimeMs); } @@ -135,6 +160,7 @@ public boolean equals(Object o) { TimingStats that = (TimingStats) o; return Objects.equals(this.jobId, that.jobId) && this.bucketCount == that.bucketCount + && this.totalBucketProcessingTimeMs == that.totalBucketProcessingTimeMs && Objects.equals(this.minBucketProcessingTimeMs, that.minBucketProcessingTimeMs) && Objects.equals(this.maxBucketProcessingTimeMs, that.maxBucketProcessingTimeMs) && Objects.equals(this.avgBucketProcessingTimeMs, that.avgBucketProcessingTimeMs) @@ -146,6 +172,7 @@ public int hashCode() { return Objects.hash( jobId, bucketCount, + totalBucketProcessingTimeMs, minBucketProcessingTimeMs, maxBucketProcessingTimeMs, avgBucketProcessingTimeMs, @@ -156,4 +183,8 @@ public int hashCode() { public String toString() { return Strings.toString(this); } + + private static T getOrDefault(@Nullable T value, T defaultValue) { + return value != null ? value : defaultValue; + } } diff --git a/client/rest-high-level/src/test/java/org/elasticsearch/client/ml/datafeed/DatafeedTimingStatsTests.java b/client/rest-high-level/src/test/java/org/elasticsearch/client/ml/datafeed/DatafeedTimingStatsTests.java index 0a5134606da45..cde92b78f6c16 100644 --- a/client/rest-high-level/src/test/java/org/elasticsearch/client/ml/datafeed/DatafeedTimingStatsTests.java +++ b/client/rest-high-level/src/test/java/org/elasticsearch/client/ml/datafeed/DatafeedTimingStatsTests.java @@ -27,13 +27,15 @@ import java.io.IOException; import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.nullValue; public class DatafeedTimingStatsTests extends AbstractXContentTestCase { private static final String JOB_ID = "my-job-id"; public static DatafeedTimingStats createRandomInstance() { - return new DatafeedTimingStats(randomAlphaOfLength(10), randomLong(), randomDouble()); + return new DatafeedTimingStats( + randomAlphaOfLength(10), randomLong(), randomLong(), randomDouble(), randomBoolean() ? null : randomDouble()); } @Override @@ -59,14 +61,16 @@ public void testParse_OptionalFieldsAbsent() throws IOException { DatafeedTimingStats stats = DatafeedTimingStats.PARSER.apply(parser, null); assertThat(stats.getJobId(), equalTo(JOB_ID)); assertThat(stats.getSearchCount(), equalTo(0L)); + assertThat(stats.getBucketCount(), equalTo(0L)); assertThat(stats.getTotalSearchTimeMs(), equalTo(0.0)); + assertThat(stats.getAvgSearchTimePerBucketMs(), nullValue()); } } public void testEquals() { - DatafeedTimingStats stats1 = new DatafeedTimingStats(JOB_ID, 5, 100.0); - DatafeedTimingStats stats2 = new DatafeedTimingStats(JOB_ID, 5, 100.0); - DatafeedTimingStats stats3 = new DatafeedTimingStats(JOB_ID, 5, 200.0); + DatafeedTimingStats stats1 = new DatafeedTimingStats(JOB_ID, 5, 10, 100.0, 20.0); + DatafeedTimingStats stats2 = new DatafeedTimingStats(JOB_ID, 5, 10, 100.0, 20.0); + DatafeedTimingStats stats3 = new DatafeedTimingStats(JOB_ID, 5, 10, 200.0, 20.0); assertTrue(stats1.equals(stats1)); assertTrue(stats1.equals(stats2)); @@ -74,9 +78,9 @@ public void testEquals() { } public void testHashCode() { - DatafeedTimingStats stats1 = new DatafeedTimingStats(JOB_ID, 5, 100.0); - DatafeedTimingStats stats2 = new DatafeedTimingStats(JOB_ID, 5, 100.0); - DatafeedTimingStats stats3 = new DatafeedTimingStats(JOB_ID, 5, 200.0); + DatafeedTimingStats stats1 = new DatafeedTimingStats(JOB_ID, 5, 10, 100.0, 20.0); + DatafeedTimingStats stats2 = new DatafeedTimingStats(JOB_ID, 5, 10, 100.0, 20.0); + DatafeedTimingStats stats3 = new DatafeedTimingStats(JOB_ID, 5, 10, 200.0, 20.0); assertEquals(stats1.hashCode(), stats1.hashCode()); assertEquals(stats1.hashCode(), stats2.hashCode()); @@ -84,9 +88,11 @@ public void testHashCode() { } public void testConstructorAndGetters() { - DatafeedTimingStats stats = new DatafeedTimingStats(JOB_ID, 5, 123.456); + DatafeedTimingStats stats = new DatafeedTimingStats(JOB_ID, 5, 10, 123.456, 78.9); assertThat(stats.getJobId(), equalTo(JOB_ID)); assertThat(stats.getSearchCount(), equalTo(5L)); + assertThat(stats.getBucketCount(), equalTo(10L)); assertThat(stats.getTotalSearchTimeMs(), equalTo(123.456)); + assertThat(stats.getAvgSearchTimePerBucketMs(), equalTo(78.9)); } } diff --git a/client/rest-high-level/src/test/java/org/elasticsearch/client/ml/job/process/TimingStatsTests.java b/client/rest-high-level/src/test/java/org/elasticsearch/client/ml/job/process/TimingStatsTests.java index 386bc6ac0b3dd..144cf9548e6ff 100644 --- a/client/rest-high-level/src/test/java/org/elasticsearch/client/ml/job/process/TimingStatsTests.java +++ b/client/rest-high-level/src/test/java/org/elasticsearch/client/ml/job/process/TimingStatsTests.java @@ -18,9 +18,14 @@ */ package org.elasticsearch.client.ml.job.process; +import org.elasticsearch.common.xcontent.DeprecationHandler; +import org.elasticsearch.common.xcontent.XContentFactory; import org.elasticsearch.common.xcontent.XContentParser; +import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.test.AbstractXContentTestCase; +import java.io.IOException; + import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.nullValue; @@ -32,6 +37,7 @@ public static TimingStats createTestInstance(String jobId) { return new TimingStats( jobId, randomLong(), + randomDouble(), randomBoolean() ? null : randomDouble(), randomBoolean() ? null : randomDouble(), randomBoolean() ? null : randomDouble(), @@ -54,10 +60,11 @@ protected boolean supportsUnknownFields() { } public void testConstructor() { - TimingStats stats = new TimingStats(JOB_ID, 7, 1.0, 2.0, 1.23, 7.89); + TimingStats stats = new TimingStats(JOB_ID, 7, 8.61, 1.0, 2.0, 1.23, 7.89); assertThat(stats.getJobId(), equalTo(JOB_ID)); assertThat(stats.getBucketCount(), equalTo(7L)); + assertThat(stats.getTotalBucketProcessingTimeMs(), equalTo(8.61)); assertThat(stats.getMinBucketProcessingTimeMs(), equalTo(1.0)); assertThat(stats.getMaxBucketProcessingTimeMs(), equalTo(2.0)); assertThat(stats.getAvgBucketProcessingTimeMs(), equalTo(1.23)); @@ -65,20 +72,37 @@ public void testConstructor() { } public void testConstructor_NullValues() { - TimingStats stats = new TimingStats(JOB_ID, 7, null, null, null, null); + TimingStats stats = new TimingStats(JOB_ID, 7, 8.61, null, null, null, null); assertThat(stats.getJobId(), equalTo(JOB_ID)); assertThat(stats.getBucketCount(), equalTo(7L)); + assertThat(stats.getTotalBucketProcessingTimeMs(), equalTo(8.61)); assertThat(stats.getMinBucketProcessingTimeMs(), nullValue()); assertThat(stats.getMaxBucketProcessingTimeMs(), nullValue()); assertThat(stats.getAvgBucketProcessingTimeMs(), nullValue()); assertThat(stats.getExponentialAvgBucketProcessingTimeMs(), nullValue()); } + public void testParse_OptionalFieldsAbsent() throws IOException { + String json = "{\"job_id\": \"my-job-id\"}"; + try (XContentParser parser = + XContentFactory.xContent(XContentType.JSON).createParser( + xContentRegistry(), DeprecationHandler.THROW_UNSUPPORTED_OPERATION, json)) { + TimingStats stats = TimingStats.PARSER.apply(parser, null); + assertThat(stats.getJobId(), equalTo(JOB_ID)); + assertThat(stats.getBucketCount(), equalTo(0L)); + assertThat(stats.getTotalBucketProcessingTimeMs(), equalTo(0.0)); + assertThat(stats.getMinBucketProcessingTimeMs(), nullValue()); + assertThat(stats.getMaxBucketProcessingTimeMs(), nullValue()); + assertThat(stats.getAvgBucketProcessingTimeMs(), nullValue()); + assertThat(stats.getExponentialAvgBucketProcessingTimeMs(), nullValue()); + } + } + public void testEquals() { - TimingStats stats1 = new TimingStats(JOB_ID, 7, 1.0, 2.0, 1.23, 7.89); - TimingStats stats2 = new TimingStats(JOB_ID, 7, 1.0, 2.0, 1.23, 7.89); - TimingStats stats3 = new TimingStats(JOB_ID, 7, 1.0, 3.0, 1.23, 7.89); + TimingStats stats1 = new TimingStats(JOB_ID, 7, 8.61, 1.0, 2.0, 1.23, 7.89); + TimingStats stats2 = new TimingStats(JOB_ID, 7, 8.61, 1.0, 2.0, 1.23, 7.89); + TimingStats stats3 = new TimingStats(JOB_ID, 7, 8.61, 1.0, 3.0, 1.23, 7.89); assertTrue(stats1.equals(stats1)); assertTrue(stats1.equals(stats2)); @@ -86,9 +110,9 @@ public void testEquals() { } public void testHashCode() { - TimingStats stats1 = new TimingStats(JOB_ID, 7, 1.0, 2.0, 1.23, 7.89); - TimingStats stats2 = new TimingStats(JOB_ID, 7, 1.0, 2.0, 1.23, 7.89); - TimingStats stats3 = new TimingStats(JOB_ID, 7, 1.0, 3.0, 1.23, 7.89); + TimingStats stats1 = new TimingStats(JOB_ID, 7, 8.61, 1.0, 2.0, 1.23, 7.89); + TimingStats stats2 = new TimingStats(JOB_ID, 7, 8.61, 1.0, 2.0, 1.23, 7.89); + TimingStats stats3 = new TimingStats(JOB_ID, 7, 8.61, 1.0, 3.0, 1.23, 7.89); assertEquals(stats1.hashCode(), stats1.hashCode()); assertEquals(stats1.hashCode(), stats2.hashCode()); diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/action/GetDatafeedsStatsAction.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/action/GetDatafeedsStatsAction.java index e07ad3ba31dc5..5ce59b542f366 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/action/GetDatafeedsStatsAction.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/action/GetDatafeedsStatsAction.java @@ -24,8 +24,10 @@ import org.elasticsearch.xpack.core.ml.datafeed.DatafeedState; import org.elasticsearch.xpack.core.ml.datafeed.DatafeedTimingStats; import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper; +import org.elasticsearch.xpack.core.ml.utils.ToXContentParams; import java.io.IOException; +import java.util.Collections; import java.util.Map; import java.util.Objects; @@ -38,6 +40,9 @@ public class GetDatafeedsStatsAction extends StreamableResponseActionType private static final String FORECASTS_STATS = "forecasts_stats"; private static final String STATE = "state"; private static final String NODE = "node"; + private static final String ASSIGNMENT_EXPLANATION = "assignment_explanation"; + private static final String OPEN_TIME = "open_time"; private static final String TIMING_STATS = "timing_stats"; private GetJobsStatsAction() { @@ -266,13 +269,16 @@ public XContentBuilder toUnwrappedXContent(XContentBuilder builder) throws IOExc builder.endObject(); } if (assignmentExplanation != null) { - builder.field("assignment_explanation", assignmentExplanation); + builder.field(ASSIGNMENT_EXPLANATION, assignmentExplanation); } if (openTime != null) { - builder.field("open_time", openTime.getStringRep()); + builder.field(OPEN_TIME, openTime.getStringRep()); } if (timingStats != null) { - builder.field(TIMING_STATS, timingStats); + builder.field( + TIMING_STATS, + timingStats, + new MapParams(Collections.singletonMap(ToXContentParams.INCLUDE_CALCULATED_FIELDS, "true"))); } return builder; } diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/datafeed/DatafeedTimingStats.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/datafeed/DatafeedTimingStats.java index 443cb84ecdc0a..775dc9931bc86 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/datafeed/DatafeedTimingStats.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/datafeed/DatafeedTimingStats.java @@ -15,6 +15,7 @@ import org.elasticsearch.common.xcontent.ToXContent; import org.elasticsearch.common.xcontent.ToXContentObject; import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.xpack.core.ml.utils.ToXContentParams; import java.io.IOException; import java.util.Objects; @@ -26,7 +27,9 @@ public class DatafeedTimingStats implements ToXContentObject, Writeable { public static final ParseField JOB_ID = new ParseField("job_id"); public static final ParseField SEARCH_COUNT = new ParseField("search_count"); + public static final ParseField BUCKET_COUNT = new ParseField("bucket_count"); public static final ParseField TOTAL_SEARCH_TIME_MS = new ParseField("total_search_time_ms"); + public static final ParseField AVG_SEARCH_TIME_PER_BUCKET_MS = new ParseField("average_search_time_per_bucket_ms"); public static final ParseField TYPE = new ParseField("datafeed_timing_stats"); @@ -40,11 +43,14 @@ private static ConstructingObjectParser createParser( args -> { String jobId = (String) args[0]; Long searchCount = (Long) args[1]; - Double totalSearchTimeMs = (Double) args[2]; - return new DatafeedTimingStats(jobId, getOrDefault(searchCount, 0L), getOrDefault(totalSearchTimeMs, 0.0)); + Long bucketCount = (Long) args[2]; + Double totalSearchTimeMs = (Double) args[3]; + return new DatafeedTimingStats( + jobId, getOrDefault(searchCount, 0L), getOrDefault(bucketCount, 0L), getOrDefault(totalSearchTimeMs, 0.0)); }); parser.declareString(constructorArg(), JOB_ID); parser.declareLong(optionalConstructorArg(), SEARCH_COUNT); + parser.declareLong(optionalConstructorArg(), BUCKET_COUNT); parser.declareDouble(optionalConstructorArg(), TOTAL_SEARCH_TIME_MS); return parser; } @@ -55,26 +61,29 @@ public static String documentId(String jobId) { private final String jobId; private long searchCount; + private long bucketCount; private double totalSearchTimeMs; - public DatafeedTimingStats(String jobId, long searchCount, double totalSearchTimeMs) { + public DatafeedTimingStats(String jobId, long searchCount, long bucketCount, double totalSearchTimeMs) { this.jobId = Objects.requireNonNull(jobId); this.searchCount = searchCount; + this.bucketCount = bucketCount; this.totalSearchTimeMs = totalSearchTimeMs; } public DatafeedTimingStats(String jobId) { - this(jobId, 0, 0); + this(jobId, 0, 0, 0.0); } public DatafeedTimingStats(StreamInput in) throws IOException { jobId = in.readString(); searchCount = in.readLong(); + bucketCount = in.readLong(); totalSearchTimeMs = in.readDouble(); } public DatafeedTimingStats(DatafeedTimingStats other) { - this(other.jobId, other.searchCount, other.totalSearchTimeMs); + this(other.jobId, other.searchCount, other.bucketCount, other.totalSearchTimeMs); } public String getJobId() { @@ -85,19 +94,34 @@ public long getSearchCount() { return searchCount; } + public long getBucketCount() { + return bucketCount; + } + public double getTotalSearchTimeMs() { return totalSearchTimeMs; } + public Double getAvgSearchTimePerBucketMs() { + return bucketCount > 0 + ? totalSearchTimeMs / bucketCount + : null; + } + public void incrementTotalSearchTimeMs(double searchTimeMs) { this.searchCount++; this.totalSearchTimeMs += searchTimeMs; } + public void setBucketCount(long bucketCount) { + this.bucketCount = bucketCount; + } + @Override public void writeTo(StreamOutput out) throws IOException { out.writeString(jobId); out.writeLong(searchCount); + out.writeLong(bucketCount); out.writeDouble(totalSearchTimeMs); } @@ -106,7 +130,14 @@ public XContentBuilder toXContent(XContentBuilder builder, ToXContent.Params par builder.startObject(); builder.field(JOB_ID.getPreferredName(), jobId); builder.field(SEARCH_COUNT.getPreferredName(), searchCount); + builder.field(BUCKET_COUNT.getPreferredName(), bucketCount); builder.field(TOTAL_SEARCH_TIME_MS.getPreferredName(), totalSearchTimeMs); + if (params.paramAsBoolean(ToXContentParams.INCLUDE_CALCULATED_FIELDS, false)) { + Double avgSearchTimePerBucket = getAvgSearchTimePerBucketMs(); + if (avgSearchTimePerBucket != null) { + builder.field(AVG_SEARCH_TIME_PER_BUCKET_MS.getPreferredName(), getAvgSearchTimePerBucketMs()); + } + } builder.endObject(); return builder; } @@ -123,12 +154,13 @@ public boolean equals(Object obj) { DatafeedTimingStats other = (DatafeedTimingStats) obj; return Objects.equals(this.jobId, other.jobId) && this.searchCount == other.searchCount + && this.bucketCount == other.bucketCount && this.totalSearchTimeMs == other.totalSearchTimeMs; } @Override public int hashCode() { - return Objects.hash(jobId, searchCount, totalSearchTimeMs); + return Objects.hash(jobId, searchCount, bucketCount, totalSearchTimeMs); } @Override diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/persistence/ElasticsearchMappings.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/persistence/ElasticsearchMappings.java index 894741791124b..6981772066b96 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/persistence/ElasticsearchMappings.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/persistence/ElasticsearchMappings.java @@ -936,6 +936,7 @@ private static void addTimingStatsExceptBucketCountMapping(XContentBuilder build /** * {@link DatafeedTimingStats} mapping. + * Does not include mapping for BUCKET_COUNT as this mapping is added by {@link #addDataCountsMapping} method. * * @throws IOException On builder write error */ @@ -944,6 +945,7 @@ private static void addDatafeedTimingStats(XContentBuilder builder) throws IOExc .startObject(DatafeedTimingStats.SEARCH_COUNT.getPreferredName()) .field(TYPE, LONG) .endObject() + // re-used: BUCKET_COUNT .startObject(DatafeedTimingStats.TOTAL_SEARCH_TIME_MS.getPreferredName()) .field(TYPE, DOUBLE) .endObject(); diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/process/autodetect/state/TimingStats.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/process/autodetect/state/TimingStats.java index 0bcc4ea5f45d2..b526d614df3ab 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/process/autodetect/state/TimingStats.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/process/autodetect/state/TimingStats.java @@ -15,6 +15,7 @@ import org.elasticsearch.common.xcontent.ToXContentObject; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.xpack.core.ml.job.config.Job; +import org.elasticsearch.xpack.core.ml.utils.ToXContentParams; import java.io.IOException; import java.util.Objects; @@ -28,6 +29,7 @@ public class TimingStats implements ToXContentObject, Writeable { public static final ParseField BUCKET_COUNT = new ParseField("bucket_count"); + public static final ParseField TOTAL_BUCKET_PROCESSING_TIME_MS = new ParseField("total_bucket_processing_time_ms"); public static final ParseField MIN_BUCKET_PROCESSING_TIME_MS = new ParseField("minimum_bucket_processing_time_ms"); public static final ParseField MAX_BUCKET_PROCESSING_TIME_MS = new ParseField("maximum_bucket_processing_time_ms"); public static final ParseField AVG_BUCKET_PROCESSING_TIME_MS = new ParseField("average_bucket_processing_time_ms"); @@ -40,8 +42,21 @@ public class TimingStats implements ToXContentObject, Writeable { new ConstructingObjectParser<>( TYPE.getPreferredName(), true, - args -> - new TimingStats((String) args[0], (long) args[1], (Double) args[2], (Double) args[3], (Double) args[4], (Double) args[5])); + args -> { + String jobId = (String) args[0]; + long bucketCount = (long) args[1]; + Double minBucketProcessingTimeMs = (Double) args[2]; + Double maxBucketProcessingTimeMs = (Double) args[3]; + Double avgBucketProcessingTimeMs = (Double) args[4]; + Double exponentialAvgBucketProcessingTimeMs = (Double) args[5]; + return new TimingStats( + jobId, + bucketCount, + minBucketProcessingTimeMs, + maxBucketProcessingTimeMs, + avgBucketProcessingTimeMs, + exponentialAvgBucketProcessingTimeMs); + }); static { PARSER.declareString(constructorArg(), Job.ID); @@ -109,6 +124,13 @@ public long getBucketCount() { return bucketCount; } + /** Calculates total bucket processing time as a product of the all-time average bucket processing time and the number of buckets. */ + public double getTotalBucketProcessingTimeMs() { + return avgBucketProcessingTimeMs != null + ? bucketCount * avgBucketProcessingTimeMs + : 0.0; + } + public Double getMinBucketProcessingTimeMs() { return minBucketProcessingTimeMs; } @@ -126,7 +148,7 @@ public Double getExponentialAvgBucketProcessingTimeMs() { } /** - * Updates the statistics (min, max, avg) for the given data point (bucket processing time). + * Updates the statistics (min, max, avg, exponential avg) for the given data point (bucket processing time). */ public void updateStats(double bucketProcessingTimeMs) { if (bucketProcessingTimeMs < 0.0) { @@ -175,6 +197,9 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws builder.startObject(); builder.field(Job.ID.getPreferredName(), jobId); builder.field(BUCKET_COUNT.getPreferredName(), bucketCount); + if (params.paramAsBoolean(ToXContentParams.INCLUDE_CALCULATED_FIELDS, false)) { + builder.field(TOTAL_BUCKET_PROCESSING_TIME_MS.getPreferredName(), getTotalBucketProcessingTimeMs()); + } if (minBucketProcessingTimeMs != null) { builder.field(MIN_BUCKET_PROCESSING_TIME_MS.getPreferredName(), minBucketProcessingTimeMs); } diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/results/ReservedFieldNames.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/results/ReservedFieldNames.java index ddd13e7fc31ba..51717c6bad2d0 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/results/ReservedFieldNames.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/results/ReservedFieldNames.java @@ -187,6 +187,7 @@ public final class ReservedFieldNames { TimingStats.EXPONENTIAL_AVG_BUCKET_PROCESSING_TIME_MS.getPreferredName(), DatafeedTimingStats.SEARCH_COUNT.getPreferredName(), + DatafeedTimingStats.BUCKET_COUNT.getPreferredName(), DatafeedTimingStats.TOTAL_SEARCH_TIME_MS.getPreferredName(), GetResult._ID, diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/utils/ToXContentParams.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/utils/ToXContentParams.java index f7fb9d46ec8a8..249a319823c75 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/utils/ToXContentParams.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/utils/ToXContentParams.java @@ -24,6 +24,12 @@ public final class ToXContentParams { */ public static final String INCLUDE_TYPE = "include_type"; + /** + * When serialising POJOs to X Content this indicates whether the calculated (i.e. not stored) fields + * should be included or not + */ + public static final String INCLUDE_CALCULATED_FIELDS = "include_calculated_fields"; + private ToXContentParams() { } } diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/action/GetDatafeedStatsActionResponseTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/action/GetDatafeedStatsActionResponseTests.java index 0fbd8a3386eb9..c721bf6de2d6e 100644 --- a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/action/GetDatafeedStatsActionResponseTests.java +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/action/GetDatafeedStatsActionResponseTests.java @@ -77,7 +77,7 @@ public void testDatafeedStatsToXContent() throws IOException { Set.of(), Version.CURRENT); - DatafeedTimingStats timingStats = new DatafeedTimingStats("my-job-id", 5, 123.456); + DatafeedTimingStats timingStats = new DatafeedTimingStats("my-job-id", 5, 10, 100.0); Response.DatafeedStats stats = new Response.DatafeedStats("df-id", DatafeedState.STARTED, node, null, timingStats); @@ -109,9 +109,11 @@ public void testDatafeedStatsToXContent() throws IOException { assertThat(nodeAttributes, hasEntry("ml.max_open_jobs", "5")); Map timingStatsMap = (Map) dfStatsMap.get("timing_stats"); - assertThat(timingStatsMap.size(), is(equalTo(3))); + assertThat(timingStatsMap.size(), is(equalTo(5))); assertThat(timingStatsMap, hasEntry("job_id", "my-job-id")); assertThat(timingStatsMap, hasEntry("search_count", 5)); - assertThat(timingStatsMap, hasEntry("total_search_time_ms", 123.456)); + assertThat(timingStatsMap, hasEntry("bucket_count", 10)); + assertThat(timingStatsMap, hasEntry("total_search_time_ms", 100.0)); + assertThat(timingStatsMap, hasEntry("average_search_time_per_bucket_ms", 10.0)); } } diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/datafeed/DatafeedTimingStatsTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/datafeed/DatafeedTimingStatsTests.java index 9ecff4974a751..e56475705eab1 100644 --- a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/datafeed/DatafeedTimingStatsTests.java +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/datafeed/DatafeedTimingStatsTests.java @@ -14,14 +14,16 @@ import java.io.IOException; +import static org.hamcrest.Matchers.closeTo; import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.nullValue; public class DatafeedTimingStatsTests extends AbstractSerializingTestCase { private static final String JOB_ID = "my-job-id"; public static DatafeedTimingStats createRandom() { - return new DatafeedTimingStats(randomAlphaOfLength(10), randomLong(), randomDouble()); + return new DatafeedTimingStats(randomAlphaOfLength(10), randomLong(), randomLong(), randomDouble()); } @Override @@ -43,10 +45,12 @@ protected DatafeedTimingStats doParseInstance(XContentParser parser) { protected DatafeedTimingStats mutateInstance(DatafeedTimingStats instance) throws IOException { String jobId = instance.getJobId(); long searchCount = instance.getSearchCount(); + long bucketCount = instance.getBucketCount(); double totalSearchTimeMs = instance.getTotalSearchTimeMs(); return new DatafeedTimingStats( jobId + randomAlphaOfLength(5), - searchCount + 1, + searchCount + 2, + bucketCount + 1, totalSearchTimeMs + randomDoubleBetween(1.0, 100.0, true)); } @@ -58,14 +62,16 @@ public void testParse_OptionalFieldsAbsent() throws IOException { DatafeedTimingStats stats = DatafeedTimingStats.PARSER.apply(parser, null); assertThat(stats.getJobId(), equalTo(JOB_ID)); assertThat(stats.getSearchCount(), equalTo(0L)); + assertThat(stats.getBucketCount(), equalTo(0L)); assertThat(stats.getTotalSearchTimeMs(), equalTo(0.0)); + assertThat(stats.getAvgSearchTimePerBucketMs(), nullValue()); } } public void testEquals() { - DatafeedTimingStats stats1 = new DatafeedTimingStats(JOB_ID, 5, 100.0); - DatafeedTimingStats stats2 = new DatafeedTimingStats(JOB_ID, 5, 100.0); - DatafeedTimingStats stats3 = new DatafeedTimingStats(JOB_ID, 5, 200.0); + DatafeedTimingStats stats1 = new DatafeedTimingStats(JOB_ID, 5, 10, 100.0); + DatafeedTimingStats stats2 = new DatafeedTimingStats(JOB_ID, 5, 10, 100.0); + DatafeedTimingStats stats3 = new DatafeedTimingStats(JOB_ID, 5, 10, 200.0); assertTrue(stats1.equals(stats1)); assertTrue(stats1.equals(stats2)); @@ -73,9 +79,9 @@ public void testEquals() { } public void testHashCode() { - DatafeedTimingStats stats1 = new DatafeedTimingStats(JOB_ID, 5, 100.0); - DatafeedTimingStats stats2 = new DatafeedTimingStats(JOB_ID, 5, 100.0); - DatafeedTimingStats stats3 = new DatafeedTimingStats(JOB_ID, 5, 200.0); + DatafeedTimingStats stats1 = new DatafeedTimingStats(JOB_ID, 5, 10, 100.0); + DatafeedTimingStats stats2 = new DatafeedTimingStats(JOB_ID, 5, 10, 100.0); + DatafeedTimingStats stats3 = new DatafeedTimingStats(JOB_ID, 5, 10, 200.0); assertEquals(stats1.hashCode(), stats1.hashCode()); assertEquals(stats1.hashCode(), stats2.hashCode()); @@ -83,32 +89,72 @@ public void testHashCode() { } public void testConstructorsAndGetters() { - DatafeedTimingStats stats = new DatafeedTimingStats(JOB_ID, 5, 123.456); + DatafeedTimingStats stats = new DatafeedTimingStats(JOB_ID, 5, 10, 123.456); assertThat(stats.getJobId(), equalTo(JOB_ID)); assertThat(stats.getSearchCount(), equalTo(5L)); + assertThat(stats.getBucketCount(), equalTo(10L)); assertThat(stats.getTotalSearchTimeMs(), equalTo(123.456)); + assertThat(stats.getAvgSearchTimePerBucketMs(), closeTo(12.3456, 1e-9)); stats = new DatafeedTimingStats(JOB_ID); assertThat(stats.getJobId(), equalTo(JOB_ID)); assertThat(stats.getSearchCount(), equalTo(0L)); + assertThat(stats.getBucketCount(), equalTo(0L)); assertThat(stats.getTotalSearchTimeMs(), equalTo(0.0)); + assertThat(stats.getAvgSearchTimePerBucketMs(), nullValue()); } public void testCopyConstructor() { - DatafeedTimingStats stats1 = new DatafeedTimingStats(JOB_ID, 5, 123.456); + DatafeedTimingStats stats1 = new DatafeedTimingStats(JOB_ID, 5, 10, 123.456); DatafeedTimingStats stats2 = new DatafeedTimingStats(stats1); assertThat(stats2.getJobId(), equalTo(JOB_ID)); assertThat(stats2.getSearchCount(), equalTo(5L)); + assertThat(stats2.getBucketCount(), equalTo(10L)); assertThat(stats2.getTotalSearchTimeMs(), equalTo(123.456)); + assertThat(stats2.getAvgSearchTimePerBucketMs(), closeTo(12.3456, 1e-9)); } public void testIncrementTotalSearchTimeMs() { - DatafeedTimingStats stats = new DatafeedTimingStats(JOB_ID, 5, 100.0); + DatafeedTimingStats stats = new DatafeedTimingStats(JOB_ID, 5, 10, 100.0); stats.incrementTotalSearchTimeMs(200.0); assertThat(stats.getJobId(), equalTo(JOB_ID)); assertThat(stats.getSearchCount(), equalTo(6L)); + assertThat(stats.getBucketCount(), equalTo(10L)); assertThat(stats.getTotalSearchTimeMs(), equalTo(300.0)); + assertThat(stats.getAvgSearchTimePerBucketMs(), equalTo(30.0)); + } + + public void testSetBucketCount() { + DatafeedTimingStats stats = new DatafeedTimingStats(JOB_ID, 5, 10, 100.0); + stats.setBucketCount(20); + assertThat(stats.getJobId(), equalTo(JOB_ID)); + assertThat(stats.getSearchCount(), equalTo(5L)); + assertThat(stats.getBucketCount(), equalTo(20L)); + assertThat(stats.getTotalSearchTimeMs(), equalTo(100.0)); + assertThat(stats.getAvgSearchTimePerBucketMs(), equalTo(5.0)); + } + + public void testAvgSearchTimePerBucketIsCalculatedProperlyAfterUpdates() { + DatafeedTimingStats stats = new DatafeedTimingStats(JOB_ID, 5, 10, 100.0); + assertThat(stats.getBucketCount(), equalTo(10L)); + assertThat(stats.getTotalSearchTimeMs(), equalTo(100.0)); + assertThat(stats.getAvgSearchTimePerBucketMs(), equalTo(10.0)); + + stats.setBucketCount(20); + assertThat(stats.getBucketCount(), equalTo(20L)); + assertThat(stats.getTotalSearchTimeMs(), equalTo(100.0)); + assertThat(stats.getAvgSearchTimePerBucketMs(), equalTo(5.0)); + + stats.incrementTotalSearchTimeMs(200.0); + assertThat(stats.getBucketCount(), equalTo(20L)); + assertThat(stats.getTotalSearchTimeMs(), equalTo(300.0)); + assertThat(stats.getAvgSearchTimePerBucketMs(), equalTo(15.0)); + + stats.setBucketCount(25); + assertThat(stats.getBucketCount(), equalTo(25L)); + assertThat(stats.getTotalSearchTimeMs(), equalTo(300.0)); + assertThat(stats.getAvgSearchTimePerBucketMs(), equalTo(12.0)); } public void testDocumentId() { diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/job/process/autodetect/state/TimingStatsTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/job/process/autodetect/state/TimingStatsTests.java index 99e29d01bd724..1a35d0feaedaf 100644 --- a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/job/process/autodetect/state/TimingStatsTests.java +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/job/process/autodetect/state/TimingStatsTests.java @@ -69,6 +69,7 @@ public void testDefaultConstructor() { assertThat(stats.getJobId(), equalTo(JOB_ID)); assertThat(stats.getBucketCount(), equalTo(0L)); + assertThat(stats.getTotalBucketProcessingTimeMs(), equalTo(0.0)); assertThat(stats.getMinBucketProcessingTimeMs(), nullValue()); assertThat(stats.getMaxBucketProcessingTimeMs(), nullValue()); assertThat(stats.getAvgBucketProcessingTimeMs(), nullValue()); @@ -80,6 +81,7 @@ public void testConstructor() { assertThat(stats.getJobId(), equalTo(JOB_ID)); assertThat(stats.getBucketCount(), equalTo(7L)); + assertThat(stats.getTotalBucketProcessingTimeMs(), equalTo(8.61)); assertThat(stats.getMinBucketProcessingTimeMs(), equalTo(1.0)); assertThat(stats.getMaxBucketProcessingTimeMs(), equalTo(2.0)); assertThat(stats.getAvgBucketProcessingTimeMs(), equalTo(1.23)); @@ -92,6 +94,7 @@ public void testCopyConstructor() { assertThat(stats2.getJobId(), equalTo(JOB_ID)); assertThat(stats2.getBucketCount(), equalTo(7L)); + assertThat(stats2.getTotalBucketProcessingTimeMs(), equalTo(8.61)); assertThat(stats2.getMinBucketProcessingTimeMs(), equalTo(1.0)); assertThat(stats2.getMaxBucketProcessingTimeMs(), equalTo(2.0)); assertThat(stats2.getAvgBucketProcessingTimeMs(), equalTo(1.23)); @@ -119,6 +122,26 @@ public void testUpdateStats() { assertThat(stats, areCloseTo(new TimingStats(JOB_ID, 5, 1.0, 5.0, 3.0, 3.00029801), 1e-9)); } + public void testTotalBucketProcessingTimeIsCalculatedProperlyAfterUpdates() { + TimingStats stats = new TimingStats(JOB_ID); + assertThat(stats.getTotalBucketProcessingTimeMs(), equalTo(0.0)); + + stats.updateStats(3); + assertThat(stats.getTotalBucketProcessingTimeMs(), equalTo(3.0)); + + stats.updateStats(2); + assertThat(stats.getTotalBucketProcessingTimeMs(), equalTo(5.0)); + + stats.updateStats(4); + assertThat(stats.getTotalBucketProcessingTimeMs(), equalTo(9.0)); + + stats.updateStats(1); + assertThat(stats.getTotalBucketProcessingTimeMs(), equalTo(10.0)); + + stats.updateStats(5); + assertThat(stats.getTotalBucketProcessingTimeMs(), equalTo(15.0)); + } + public void testDocumentId() { assertThat(TimingStats.documentId("my-job-id"), equalTo("my-job-id_timing_stats")); } @@ -138,6 +161,7 @@ private static Matcher areCloseTo(TimingStats operand, double error protected boolean matchesSafely(TimingStats item) { return equalTo(operand.getJobId()).matches(item.getJobId()) && equalTo(operand.getBucketCount()).matches(item.getBucketCount()) + && closeTo(operand.getTotalBucketProcessingTimeMs(), error).matches(item.getTotalBucketProcessingTimeMs()) && closeTo(operand.getMinBucketProcessingTimeMs(), error).matches(item.getMinBucketProcessingTimeMs()) && closeTo(operand.getMaxBucketProcessingTimeMs(), error).matches(item.getMaxBucketProcessingTimeMs()) && closeTo(operand.getAvgBucketProcessingTimeMs(), error).matches(item.getAvgBucketProcessingTimeMs()) diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportStartDatafeedAction.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportStartDatafeedAction.java index f657833cc1de3..48f951ef7094c 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportStartDatafeedAction.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportStartDatafeedAction.java @@ -251,7 +251,7 @@ private void createDataExtractor(Job job, DatafeedConfig datafeed, StartDatafeed datafeed, job, xContentRegistry, - // Creating fake {@link TimingStatsReporter} so that search API call is not needed. + // Creating fake DatafeedTimingStatsReporter so that search API call is not needed. new DatafeedTimingStatsReporter(new DatafeedTimingStats(job.getId()), jobResultsPersister), ActionListener.wrap( unused -> diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedJob.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedJob.java index 4a9e4fd41d9c2..18d724313e325 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedJob.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedJob.java @@ -61,6 +61,7 @@ class DatafeedJob { private final long queryDelayMs; private final Client client; private final DataExtractorFactory dataExtractorFactory; + private final DatafeedTimingStatsReporter timingStatsReporter; private final Supplier currentTimeSupplier; private final DelayedDataDetector delayedDataDetector; @@ -74,13 +75,15 @@ class DatafeedJob { private volatile boolean isIsolated; DatafeedJob(String jobId, DataDescription dataDescription, long frequencyMs, long queryDelayMs, - DataExtractorFactory dataExtractorFactory, Client client, Auditor auditor, Supplier currentTimeSupplier, - DelayedDataDetector delayedDataDetector, long latestFinalBucketEndTimeMs, long latestRecordTimeMs) { + DataExtractorFactory dataExtractorFactory, DatafeedTimingStatsReporter timingStatsReporter, Client client, + Auditor auditor, Supplier currentTimeSupplier, DelayedDataDetector delayedDataDetector, + long latestFinalBucketEndTimeMs, long latestRecordTimeMs) { this.jobId = jobId; this.dataDescription = Objects.requireNonNull(dataDescription); this.frequencyMs = frequencyMs; this.queryDelayMs = queryDelayMs; this.dataExtractorFactory = dataExtractorFactory; + this.timingStatsReporter = timingStatsReporter; this.client = client; this.auditor = auditor; this.currentTimeSupplier = currentTimeSupplier; @@ -350,6 +353,7 @@ private void run(long start, long end, FlushJobAction.Request flushRequest) thro try (InputStream in = extractedData.get()) { counts = postData(in, XContentType.JSON); LOGGER.trace("[{}] Processed another {} records", jobId, counts.getProcessedRecordCount()); + timingStatsReporter.reportDataCounts(counts); } catch (Exception e) { if (e instanceof InterruptedException) { Thread.currentThread().interrupt(); diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedJobBuilder.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedJobBuilder.java index 0689b9774b56a..778e211640279 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedJobBuilder.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedJobBuilder.java @@ -69,10 +69,20 @@ void build(String datafeedId, ActionListener listener) { TimeValue queryDelay = datafeedConfigHolder.get().getQueryDelay(); DelayedDataDetector delayedDataDetector = DelayedDataDetectorFactory.buildDetector(jobHolder.get(), datafeedConfigHolder.get(), client, xContentRegistry); - DatafeedJob datafeedJob = new DatafeedJob(jobHolder.get().getId(), buildDataDescription(jobHolder.get()), - frequency.millis(), queryDelay.millis(), - context.dataExtractorFactory, client, auditor, currentTimeSupplier, delayedDataDetector, - context.latestFinalBucketEndMs, context.latestRecordTimeMs); + DatafeedJob datafeedJob = + new DatafeedJob( + jobHolder.get().getId(), + buildDataDescription(jobHolder.get()), + frequency.millis(), + queryDelay.millis(), + context.dataExtractorFactory, + context.timingStatsReporter, + client, + auditor, + currentTimeSupplier, + delayedDataDetector, + context.latestFinalBucketEndMs, + context.latestRecordTimeMs); listener.onResponse(datafeedJob); }; @@ -92,12 +102,13 @@ void build(String datafeedId, ActionListener listener) { // Create data extractor factory Consumer datafeedTimingStatsHandler = timingStats -> { + context.timingStatsReporter = new DatafeedTimingStatsReporter(timingStats, jobResultsPersister); DataExtractorFactory.create( client, datafeedConfigHolder.get(), jobHolder.get(), xContentRegistry, - new DatafeedTimingStatsReporter(timingStats, jobResultsPersister), + context.timingStatsReporter, dataExtractorFactoryHandler); }; @@ -189,5 +200,6 @@ private static class Context { volatile long latestFinalBucketEndMs = -1L; volatile long latestRecordTimeMs = -1L; volatile DataExtractorFactory dataExtractorFactory; + volatile DatafeedTimingStatsReporter timingStatsReporter; } } diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedTimingStatsReporter.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedTimingStatsReporter.java index 260db421e0446..57d06828b0ba6 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedTimingStatsReporter.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedTimingStatsReporter.java @@ -8,6 +8,7 @@ import org.elasticsearch.action.support.WriteRequest; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.xpack.core.ml.datafeed.DatafeedTimingStats; +import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.DataCounts; import org.elasticsearch.xpack.ml.job.persistence.JobResultsPersister; import java.util.Objects; @@ -46,32 +47,55 @@ public void reportSearchDuration(TimeValue searchDuration) { return; } currentTimingStats.incrementTotalSearchTimeMs(searchDuration.millis()); + flushIfDifferSignificantly(); + } + + /** + * Reports the data counts received from the autodetect process. + */ + public void reportDataCounts(DataCounts dataCounts) { + if (dataCounts == null) { + return; + } + currentTimingStats.setBucketCount(dataCounts.getBucketCount()); + flushIfDifferSignificantly(); + } + + private void flushIfDifferSignificantly() { if (differSignificantly(currentTimingStats, persistedTimingStats)) { - // TODO: Consider changing refresh policy to NONE here and only do IMMEDIATE on datafeed _stop action - flush(WriteRequest.RefreshPolicy.IMMEDIATE); + flush(); } } - private void flush(WriteRequest.RefreshPolicy refreshPolicy) { + private void flush() { persistedTimingStats = new DatafeedTimingStats(currentTimingStats); - jobResultsPersister.persistDatafeedTimingStats(persistedTimingStats, refreshPolicy); + // TODO: Consider changing refresh policy to NONE here and only do IMMEDIATE on datafeed _stop action + jobResultsPersister.persistDatafeedTimingStats(persistedTimingStats, WriteRequest.RefreshPolicy.IMMEDIATE); } /** * Returns true if given stats objects differ from each other by more than 10% for at least one of the statistics. */ public static boolean differSignificantly(DatafeedTimingStats stats1, DatafeedTimingStats stats2) { - return differSignificantly(stats1.getTotalSearchTimeMs(), stats2.getTotalSearchTimeMs()); + return differSignificantly(stats1.getTotalSearchTimeMs(), stats2.getTotalSearchTimeMs()) + || differSignificantly(stats1.getAvgSearchTimePerBucketMs(), stats2.getAvgSearchTimePerBucketMs()); } /** - * Returns {@code true} if one of the ratios { value1 / value2, value2 / value1 } is smaller than MIN_VALID_RATIO. + * Returns {@code true} if one of the ratios { value1 / value2, value2 / value1 } is smaller than MIN_VALID_RATIO or + * the absolute difference |value1 - value2| is greater than MAX_VALID_ABS_DIFFERENCE_MS. * This can be interpreted as values { value1, value2 } differing significantly from each other. + * This method also returns: + * - {@code true} in case one value is {@code null} while the other is not. + * - {@code false} in case both values are {@code null}. */ - private static boolean differSignificantly(double value1, double value2) { - return (value2 / value1 < MIN_VALID_RATIO) - || (value1 / value2 < MIN_VALID_RATIO) - || Math.abs(value1 - value2) > MAX_VALID_ABS_DIFFERENCE_MS; + private static boolean differSignificantly(Double value1, Double value2) { + if (value1 != null && value2 != null) { + return (value2 / value1 < MIN_VALID_RATIO) + || (value1 / value2 < MIN_VALID_RATIO) + || Math.abs(value1 - value2) > MAX_VALID_ABS_DIFFERENCE_MS; + } + return (value1 != null) || (value2 != null); } /** diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/datafeed/DatafeedJobTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/datafeed/DatafeedJobTests.java index 8d8bd84a97c12..7b33a59d048de 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/datafeed/DatafeedJobTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/datafeed/DatafeedJobTests.java @@ -73,10 +73,11 @@ public class DatafeedJobTests extends ESTestCase { private Auditor auditor; private DataExtractorFactory dataExtractorFactory; private DataExtractor dataExtractor; + private DatafeedTimingStatsReporter timingStatsReporter; private Client client; private DelayedDataDetector delayedDataDetector; private DataDescription.Builder dataDescription; - ActionFuture postDataFuture; + private ActionFuture postDataFuture; private ActionFuture flushJobFuture; private ActionFuture indexFuture; private ArgumentCaptor flushJobRequests; @@ -93,6 +94,7 @@ public void setup() throws Exception { dataExtractorFactory = mock(DataExtractorFactory.class); dataExtractor = mock(DataExtractor.class); when(dataExtractorFactory.newExtractor(anyLong(), anyLong())).thenReturn(dataExtractor); + timingStatsReporter = mock(DatafeedTimingStatsReporter.class); client = mock(Client.class); ThreadPool threadPool = mock(ThreadPool.class); when(client.threadPool()).thenReturn(threadPool); @@ -455,7 +457,7 @@ public void testFlushAnalysisProblemIsConflict() { private DatafeedJob createDatafeedJob(long frequencyMs, long queryDelayMs, long latestFinalBucketEndTimeMs, long latestRecordTimeMs) { Supplier currentTimeSupplier = () -> currentTime; - return new DatafeedJob(jobId, dataDescription.build(), frequencyMs, queryDelayMs, dataExtractorFactory, client, auditor, - currentTimeSupplier, delayedDataDetector, latestFinalBucketEndTimeMs, latestRecordTimeMs); + return new DatafeedJob(jobId, dataDescription.build(), frequencyMs, queryDelayMs, dataExtractorFactory, timingStatsReporter, + client, auditor, currentTimeSupplier, delayedDataDetector, latestFinalBucketEndTimeMs, latestRecordTimeMs); } } diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/datafeed/DatafeedTimingStatsReporterTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/datafeed/DatafeedTimingStatsReporterTests.java index 9c86f05f2076e..93df704deae10 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/datafeed/DatafeedTimingStatsReporterTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/datafeed/DatafeedTimingStatsReporterTests.java @@ -9,6 +9,7 @@ import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.xpack.core.ml.datafeed.DatafeedTimingStats; +import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.DataCounts; import org.elasticsearch.xpack.ml.job.persistence.JobResultsPersister; import org.junit.Before; import org.mockito.InOrder; @@ -18,6 +19,7 @@ import static org.mockito.Mockito.inOrder; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.verifyNoMoreInteractions; +import static org.mockito.Mockito.verifyZeroInteractions; public class DatafeedTimingStatsReporterTests extends ESTestCase { @@ -31,59 +33,106 @@ public void setUpTests() { jobResultsPersister = mock(JobResultsPersister.class); } + public void testReportSearchDuration_Null() { + DatafeedTimingStatsReporter timingStatsReporter = + new DatafeedTimingStatsReporter(new DatafeedTimingStats(JOB_ID, 3, 10, 10000.0), jobResultsPersister); + assertThat(timingStatsReporter.getCurrentTimingStats(), equalTo(new DatafeedTimingStats(JOB_ID, 3, 10, 10000.0))); + + timingStatsReporter.reportSearchDuration(null); + assertThat(timingStatsReporter.getCurrentTimingStats(), equalTo(new DatafeedTimingStats(JOB_ID, 3, 10, 10000.0))); + + verifyZeroInteractions(jobResultsPersister); + } + public void testReportSearchDuration() { DatafeedTimingStatsReporter timingStatsReporter = - new DatafeedTimingStatsReporter(new DatafeedTimingStats(JOB_ID, 3, 10000.0), jobResultsPersister); - assertThat(timingStatsReporter.getCurrentTimingStats(), equalTo(new DatafeedTimingStats(JOB_ID, 3, 10000.0))); + new DatafeedTimingStatsReporter(new DatafeedTimingStats(JOB_ID, 3, 10, 10000.0), jobResultsPersister); + assertThat(timingStatsReporter.getCurrentTimingStats(), equalTo(new DatafeedTimingStats(JOB_ID, 3, 10, 10000.0))); timingStatsReporter.reportSearchDuration(ONE_SECOND); - assertThat(timingStatsReporter.getCurrentTimingStats(), equalTo(new DatafeedTimingStats(JOB_ID, 4, 11000.0))); + assertThat(timingStatsReporter.getCurrentTimingStats(), equalTo(new DatafeedTimingStats(JOB_ID, 4, 10, 11000.0))); timingStatsReporter.reportSearchDuration(ONE_SECOND); - assertThat(timingStatsReporter.getCurrentTimingStats(), equalTo(new DatafeedTimingStats(JOB_ID, 5, 12000.0))); + assertThat(timingStatsReporter.getCurrentTimingStats(), equalTo(new DatafeedTimingStats(JOB_ID, 5, 10, 12000.0))); timingStatsReporter.reportSearchDuration(ONE_SECOND); - assertThat(timingStatsReporter.getCurrentTimingStats(), equalTo(new DatafeedTimingStats(JOB_ID, 6, 13000.0))); + assertThat(timingStatsReporter.getCurrentTimingStats(), equalTo(new DatafeedTimingStats(JOB_ID, 6, 10, 13000.0))); timingStatsReporter.reportSearchDuration(ONE_SECOND); - assertThat(timingStatsReporter.getCurrentTimingStats(), equalTo(new DatafeedTimingStats(JOB_ID, 7, 14000.0))); + assertThat(timingStatsReporter.getCurrentTimingStats(), equalTo(new DatafeedTimingStats(JOB_ID, 7, 10, 14000.0))); InOrder inOrder = inOrder(jobResultsPersister); inOrder.verify(jobResultsPersister).persistDatafeedTimingStats( - new DatafeedTimingStats(JOB_ID, 5, 12000.0), RefreshPolicy.IMMEDIATE); + new DatafeedTimingStats(JOB_ID, 5, 10, 12000.0), RefreshPolicy.IMMEDIATE); + inOrder.verify(jobResultsPersister).persistDatafeedTimingStats( + new DatafeedTimingStats(JOB_ID, 7, 10, 14000.0), RefreshPolicy.IMMEDIATE); + verifyNoMoreInteractions(jobResultsPersister); + } + + public void testReportDataCounts_Null() { + DatafeedTimingStatsReporter timingStatsReporter = + new DatafeedTimingStatsReporter(new DatafeedTimingStats(JOB_ID, 3, 10, 10000.0), jobResultsPersister); + assertThat(timingStatsReporter.getCurrentTimingStats(), equalTo(new DatafeedTimingStats(JOB_ID, 3, 10, 10000.0))); + + timingStatsReporter.reportDataCounts(null); + assertThat(timingStatsReporter.getCurrentTimingStats(), equalTo(new DatafeedTimingStats(JOB_ID, 3, 10, 10000.0))); + + verifyZeroInteractions(jobResultsPersister); + } + + public void testReportDataCounts() { + DataCounts dataCounts = new DataCounts(JOB_ID); + dataCounts.incrementBucketCount(20); + DatafeedTimingStatsReporter timingStatsReporter = + new DatafeedTimingStatsReporter(new DatafeedTimingStats(JOB_ID, 3, dataCounts.getBucketCount(), 10000.0), jobResultsPersister); + assertThat(timingStatsReporter.getCurrentTimingStats(), equalTo(new DatafeedTimingStats(JOB_ID, 3, 20, 10000.0))); + + dataCounts.incrementBucketCount(1); + timingStatsReporter.reportDataCounts(dataCounts); + assertThat(timingStatsReporter.getCurrentTimingStats(), equalTo(new DatafeedTimingStats(JOB_ID, 3, 21, 10000.0))); + + dataCounts.incrementBucketCount(1); + timingStatsReporter.reportDataCounts(dataCounts); + assertThat(timingStatsReporter.getCurrentTimingStats(), equalTo(new DatafeedTimingStats(JOB_ID, 3, 22, 10000.0))); + + dataCounts.incrementBucketCount(1); + timingStatsReporter.reportDataCounts(dataCounts); + assertThat(timingStatsReporter.getCurrentTimingStats(), equalTo(new DatafeedTimingStats(JOB_ID, 3, 23, 10000.0))); + + InOrder inOrder = inOrder(jobResultsPersister); inOrder.verify(jobResultsPersister).persistDatafeedTimingStats( - new DatafeedTimingStats(JOB_ID, 7, 14000.0), RefreshPolicy.IMMEDIATE); + new DatafeedTimingStats(JOB_ID, 3, 23, 10000.0), RefreshPolicy.IMMEDIATE); verifyNoMoreInteractions(jobResultsPersister); } public void testTimingStatsDifferSignificantly() { assertThat( DatafeedTimingStatsReporter.differSignificantly( - new DatafeedTimingStats(JOB_ID, 5, 1000.0), new DatafeedTimingStats(JOB_ID, 5, 1000.0)), + new DatafeedTimingStats(JOB_ID, 5, 10, 1000.0), new DatafeedTimingStats(JOB_ID, 5, 10, 1000.0)), is(false)); assertThat( DatafeedTimingStatsReporter.differSignificantly( - new DatafeedTimingStats(JOB_ID, 5, 1000.0), new DatafeedTimingStats(JOB_ID, 5, 1100.0)), + new DatafeedTimingStats(JOB_ID, 5, 10, 1000.0), new DatafeedTimingStats(JOB_ID, 5, 10, 1100.0)), is(false)); assertThat( DatafeedTimingStatsReporter.differSignificantly( - new DatafeedTimingStats(JOB_ID, 5, 1000.0), new DatafeedTimingStats(JOB_ID, 5, 1120.0)), + new DatafeedTimingStats(JOB_ID, 5, 10, 1000.0), new DatafeedTimingStats(JOB_ID, 5, 10, 1120.0)), is(true)); assertThat( DatafeedTimingStatsReporter.differSignificantly( - new DatafeedTimingStats(JOB_ID, 5, 10000.0), new DatafeedTimingStats(JOB_ID, 5, 11000.0)), + new DatafeedTimingStats(JOB_ID, 5, 10, 10000.0), new DatafeedTimingStats(JOB_ID, 5, 10, 11000.0)), is(false)); assertThat( DatafeedTimingStatsReporter.differSignificantly( - new DatafeedTimingStats(JOB_ID, 5, 10000.0), new DatafeedTimingStats(JOB_ID, 5, 11200.0)), + new DatafeedTimingStats(JOB_ID, 5, 10, 10000.0), new DatafeedTimingStats(JOB_ID, 5, 10, 11200.0)), is(true)); assertThat( DatafeedTimingStatsReporter.differSignificantly( - new DatafeedTimingStats(JOB_ID, 5, 100000.0), new DatafeedTimingStats(JOB_ID, 5, 110000.0)), + new DatafeedTimingStats(JOB_ID, 5, 10, 100000.0), new DatafeedTimingStats(JOB_ID, 5, 10, 110000.0)), is(false)); assertThat( DatafeedTimingStatsReporter.differSignificantly( - new DatafeedTimingStats(JOB_ID, 5, 100000.0), new DatafeedTimingStats(JOB_ID, 5, 110001.0)), + new DatafeedTimingStats(JOB_ID, 5, 10, 100000.0), new DatafeedTimingStats(JOB_ID, 5, 10, 110001.0)), is(true)); } } diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/persistence/JobResultsPersisterTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/persistence/JobResultsPersisterTests.java index abece0bc07ea0..5de2f14e99260 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/persistence/JobResultsPersisterTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/persistence/JobResultsPersisterTests.java @@ -245,7 +245,7 @@ public void testPersistDatafeedTimingStats() { .when(client).index(any(), any(ActionListener.class)); JobResultsPersister persister = new JobResultsPersister(client); - DatafeedTimingStats timingStats = new DatafeedTimingStats("foo", 6, 666.0); + DatafeedTimingStats timingStats = new DatafeedTimingStats("foo", 6, 66, 666.0); persister.persistDatafeedTimingStats(timingStats, WriteRequest.RefreshPolicy.IMMEDIATE); ArgumentCaptor indexRequestCaptor = ArgumentCaptor.forClass(IndexRequest.class); @@ -260,6 +260,7 @@ public void testPersistDatafeedTimingStats() { Map.of( "job_id", "foo", "search_count", 6, + "bucket_count", 66, "total_search_time_ms", 666.0))); verify(client, times(1)).threadPool(); diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/persistence/JobResultsProviderTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/persistence/JobResultsProviderTests.java index aec8b72e8cb72..131a1dcbf55de 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/persistence/JobResultsProviderTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/persistence/JobResultsProviderTests.java @@ -903,12 +903,14 @@ public void testDatafeedTimingStats_MultipleDocumentsAtOnce() throws IOException Map.of( Job.ID.getPreferredName(), "foo", DatafeedTimingStats.SEARCH_COUNT.getPreferredName(), 6, + DatafeedTimingStats.BUCKET_COUNT.getPreferredName(), 66, DatafeedTimingStats.TOTAL_SEARCH_TIME_MS.getPreferredName(), 666.0)); List> sourceBar = Arrays.asList( Map.of( Job.ID.getPreferredName(), "bar", DatafeedTimingStats.SEARCH_COUNT.getPreferredName(), 7, + DatafeedTimingStats.BUCKET_COUNT.getPreferredName(), 77, DatafeedTimingStats.TOTAL_SEARCH_TIME_MS.getPreferredName(), 777.0)); SearchResponse responseFoo = createSearchResponse(sourceFoo); SearchResponse responseBar = createSearchResponse(sourceBar); @@ -943,7 +945,10 @@ public void testDatafeedTimingStats_MultipleDocumentsAtOnce() throws IOException statsByJobId -> assertThat( statsByJobId, - equalTo(Map.of("foo", new DatafeedTimingStats("foo", 6, 666.0), "bar", new DatafeedTimingStats("bar", 7, 777.0)))), + equalTo( + Map.of( + "foo", new DatafeedTimingStats("foo", 6, 66, 666.0), + "bar", new DatafeedTimingStats("bar", 7, 77, 777.0)))), e -> { throw new AssertionError(); }); verify(client).threadPool(); @@ -961,6 +966,7 @@ public void testDatafeedTimingStats_Ok() throws IOException { Map.of( Job.ID.getPreferredName(), "foo", DatafeedTimingStats.SEARCH_COUNT.getPreferredName(), 6, + DatafeedTimingStats.BUCKET_COUNT.getPreferredName(), 66, DatafeedTimingStats.TOTAL_SEARCH_TIME_MS.getPreferredName(), 666.0)); SearchResponse response = createSearchResponse(source); Client client = getMockedClient( @@ -971,7 +977,7 @@ public void testDatafeedTimingStats_Ok() throws IOException { JobResultsProvider provider = createProvider(client); provider.datafeedTimingStats( "foo", - stats -> assertThat(stats, equalTo(new DatafeedTimingStats("foo", 6, 666.0))), + stats -> assertThat(stats, equalTo(new DatafeedTimingStats("foo", 6, 66, 666.0))), e -> { throw new AssertionError(); }); verify(client).prepareSearch(indexName); diff --git a/x-pack/plugin/monitoring/src/test/java/org/elasticsearch/xpack/monitoring/collector/ml/JobStatsMonitoringDocTests.java b/x-pack/plugin/monitoring/src/test/java/org/elasticsearch/xpack/monitoring/collector/ml/JobStatsMonitoringDocTests.java index dd64e0d2ded95..ebfbf8d223db9 100644 --- a/x-pack/plugin/monitoring/src/test/java/org/elasticsearch/xpack/monitoring/collector/ml/JobStatsMonitoringDocTests.java +++ b/x-pack/plugin/monitoring/src/test/java/org/elasticsearch/xpack/monitoring/collector/ml/JobStatsMonitoringDocTests.java @@ -175,6 +175,7 @@ public void testToXContent() throws IOException { + "\"timing_stats\":{" + "\"job_id\":\"_job_id\"," + "\"bucket_count\":100," + + "\"total_bucket_processing_time_ms\":2000.0," + "\"minimum_bucket_processing_time_ms\":10.0," + "\"maximum_bucket_processing_time_ms\":30.0," + "\"average_bucket_processing_time_ms\":20.0," diff --git a/x-pack/plugin/src/test/resources/rest-api-spec/test/ml/get_datafeed_stats.yml b/x-pack/plugin/src/test/resources/rest-api-spec/test/ml/get_datafeed_stats.yml index aa0e023e64054..37e4245049756 100644 --- a/x-pack/plugin/src/test/resources/rest-api-spec/test/ml/get_datafeed_stats.yml +++ b/x-pack/plugin/src/test/resources/rest-api-spec/test/ml/get_datafeed_stats.yml @@ -175,15 +175,18 @@ setup: ml.start_datafeed: datafeed_id: "datafeed-1" start: 0 + - match: { started: true} - do: ml.get_datafeed_stats: datafeed_id: datafeed-1 - - match: { datafeeds.0.datafeed_id: "datafeed-1"} - - match: { datafeeds.0.state: "started"} - - match: { datafeeds.0.timing_stats.job_id: "get-datafeed-stats-1"} - - match: { datafeeds.0.timing_stats.search_count: 0} - - match: { datafeeds.0.timing_stats.total_search_time_ms: 0.0} + - match: { datafeeds.0.datafeed_id: "datafeed-1"} + - match: { datafeeds.0.state: "started"} + - match: { datafeeds.0.timing_stats.job_id: "get-datafeed-stats-1"} + - match: { datafeeds.0.timing_stats.search_count: 0} + - match: { datafeeds.0.timing_stats.bucket_count: 0} + - match: { datafeeds.0.timing_stats.total_search_time_ms: 0.0} + - is_false: datafeeds.0.timing_stats.average_search_time_per_bucket_ms - do: ml.stop_datafeed: @@ -196,8 +199,9 @@ setup: - match: { datafeeds.0.datafeed_id: "datafeed-1"} - match: { datafeeds.0.state: "stopped"} - match: { datafeeds.0.timing_stats.job_id: "get-datafeed-stats-1"} - # TODO: Change "gte 0" to "match 1" once https://github.com/elastic/elasticsearch/issues/44132 is fixed + # We don't really know at this point if datafeed managed to perform at least one search, hence the very relaxed assertion - gte: { datafeeds.0.timing_stats.search_count: 0} + - gte: { datafeeds.0.timing_stats.bucket_count: 0} - gte: { datafeeds.0.timing_stats.total_search_time_ms: 0.0} --- diff --git a/x-pack/plugin/src/test/resources/rest-api-spec/test/ml/jobs_get_stats.yml b/x-pack/plugin/src/test/resources/rest-api-spec/test/ml/jobs_get_stats.yml index f52a2c21f773c..5638f7b2e0dac 100644 --- a/x-pack/plugin/src/test/resources/rest-api-spec/test/ml/jobs_get_stats.yml +++ b/x-pack/plugin/src/test/resources/rest-api-spec/test/ml/jobs_get_stats.yml @@ -101,6 +101,7 @@ setup: - is_true: jobs.0.open_time - match: { jobs.0.timing_stats.job_id: job-stats-test } - match: { jobs.0.timing_stats.bucket_count: 1 } # Records are 1h apart and bucket span is 1h so 1 bucket is produced + - gte: { jobs.0.timing_stats.total_bucket_processing_time_ms: 0.0 } - gte: { jobs.0.timing_stats.minimum_bucket_processing_time_ms: 0.0 } - gte: { jobs.0.timing_stats.maximum_bucket_processing_time_ms: 0.0 } - gte: { jobs.0.timing_stats.average_bucket_processing_time_ms: 0.0 } @@ -140,6 +141,7 @@ setup: - is_false: jobs.0.open_time - match: { jobs.0.timing_stats.job_id: job-stats-test } - match: { jobs.0.timing_stats.bucket_count: 1 } # Records are 1h apart and bucket span is 1h so 1 bucket is produced + - gte: { jobs.0.timing_stats.total_bucket_processing_time_ms: 0.0 } - gte: { jobs.0.timing_stats.minimum_bucket_processing_time_ms: 0.0 } - gte: { jobs.0.timing_stats.maximum_bucket_processing_time_ms: 0.0 } - gte: { jobs.0.timing_stats.average_bucket_processing_time_ms: 0.0 } @@ -158,6 +160,7 @@ setup: - is_true: jobs.0.open_time - match: { jobs.0.timing_stats.job_id: jobs-get-stats-datafeed-job } - match: { jobs.0.timing_stats.bucket_count: 0 } + - match: { jobs.0.timing_stats.total_bucket_processing_time_ms: 0.0 } - is_false: jobs.0.timing_stats.minimum_bucket_processing_time_ms - is_false: jobs.0.timing_stats.maximum_bucket_processing_time_ms - is_false: jobs.0.timing_stats.average_bucket_processing_time_ms @@ -342,6 +345,7 @@ setup: - is_false: jobs.0.open_time - match: { jobs.0.timing_stats.job_id: job-stats-test } - match: { jobs.0.timing_stats.bucket_count: 0 } + - match: { jobs.0.timing_stats.total_bucket_processing_time_ms: 0.0 } - is_false: jobs.0.timing_stats.minimum_bucket_processing_time_ms - is_false: jobs.0.timing_stats.maximum_bucket_processing_time_ms - is_false: jobs.0.timing_stats.average_bucket_processing_time_ms @@ -356,6 +360,7 @@ setup: - is_false: jobs.1.open_time - match: { jobs.1.timing_stats.job_id: jobs-get-stats-datafeed-job } - match: { jobs.1.timing_stats.bucket_count: 0 } + - match: { jobs.1.timing_stats.total_bucket_processing_time_ms: 0.0 } - is_false: jobs.1.timing_stats.minimum_bucket_processing_time_ms - is_false: jobs.1.timing_stats.maximum_bucket_processing_time_ms - is_false: jobs.1.timing_stats.average_bucket_processing_time_ms