Skip to content

Commit fb412db

Browse files
[ML] Fix datafeed skipping first bucket after lookback when aggs are … (elastic#39859) (elastic#39961)
The problem here was that `DatafeedJob` was updating the last end time searched based on the `now` even though when there are aggregations, the extactor will only search up to the floor of `now` against the histogram interval. This commit fixes the issue by using the end time as calculated by the extractor. It also adds an integration test that uses aggregations. This test would fail before this fix. Unfortunately the test is slow as we need to wait for the datafeed to work in real time. Closes elastic#39842
1 parent 6b13189 commit fb412db

File tree

8 files changed

+174
-6
lines changed

8 files changed

+174
-6
lines changed

x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/datafeed/extractor/DataExtractor.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,4 +33,9 @@ public interface DataExtractor {
3333
* Cancel the current search.
3434
*/
3535
void cancel();
36+
37+
/**
38+
* @return the end time to which this extractor will search
39+
*/
40+
long getEndTime();
3641
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,139 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License;
4+
* you may not use this file except in compliance with the Elastic License.
5+
*/
6+
package org.elasticsearch.xpack.ml.integration;
7+
8+
import org.elasticsearch.action.bulk.BulkRequestBuilder;
9+
import org.elasticsearch.action.bulk.BulkResponse;
10+
import org.elasticsearch.action.index.IndexRequest;
11+
import org.elasticsearch.action.support.WriteRequest;
12+
import org.elasticsearch.common.unit.TimeValue;
13+
import org.elasticsearch.search.aggregations.AggregationBuilders;
14+
import org.elasticsearch.search.aggregations.AggregatorFactories;
15+
import org.elasticsearch.xpack.core.ml.action.GetBucketsAction;
16+
import org.elasticsearch.xpack.core.ml.action.GetDatafeedsStatsAction;
17+
import org.elasticsearch.xpack.core.ml.action.StopDatafeedAction;
18+
import org.elasticsearch.xpack.core.ml.datafeed.DatafeedConfig;
19+
import org.elasticsearch.xpack.core.ml.datafeed.DatafeedState;
20+
import org.elasticsearch.xpack.core.ml.job.config.AnalysisConfig;
21+
import org.elasticsearch.xpack.core.ml.job.config.DataDescription;
22+
import org.elasticsearch.xpack.core.ml.job.config.Detector;
23+
import org.elasticsearch.xpack.core.ml.job.config.Job;
24+
import org.elasticsearch.xpack.core.ml.job.results.Bucket;
25+
import org.junit.After;
26+
27+
import java.util.Collections;
28+
import java.util.List;
29+
import java.util.concurrent.TimeUnit;
30+
31+
import static org.hamcrest.Matchers.equalTo;
32+
import static org.hamcrest.Matchers.greaterThan;
33+
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
34+
35+
public class DatafeedWithAggsIT extends MlNativeAutodetectIntegTestCase {
36+
37+
@After
38+
public void cleanup(){
39+
cleanUp();
40+
}
41+
42+
public void testRealtime() throws Exception {
43+
String dataIndex = "datafeed-with-aggs-rt-data";
44+
45+
// A job with a bucket_span of 2s
46+
String jobId = "datafeed-with-aggs-rt-job";
47+
DataDescription.Builder dataDescription = new DataDescription.Builder();
48+
49+
Detector.Builder d = new Detector.Builder("count", null);
50+
AnalysisConfig.Builder analysisConfig = new AnalysisConfig.Builder(Collections.singletonList(d.build()));
51+
analysisConfig.setBucketSpan(TimeValue.timeValueSeconds(2));
52+
analysisConfig.setSummaryCountFieldName("doc_count");
53+
54+
Job.Builder jobBuilder = new Job.Builder();
55+
jobBuilder.setId(jobId);
56+
57+
jobBuilder.setAnalysisConfig(analysisConfig);
58+
jobBuilder.setDataDescription(dataDescription);
59+
60+
// Datafeed with aggs
61+
String datafeedId = jobId + "-feed";
62+
DatafeedConfig.Builder datafeedBuilder = new DatafeedConfig.Builder(datafeedId, jobId);
63+
datafeedBuilder.setQueryDelay(TimeValue.timeValueMillis(100));
64+
datafeedBuilder.setFrequency(TimeValue.timeValueSeconds(1));
65+
datafeedBuilder.setIndices(Collections.singletonList(dataIndex));
66+
67+
AggregatorFactories.Builder aggs = new AggregatorFactories.Builder();
68+
aggs.addAggregator(AggregationBuilders.dateHistogram("time").field("time").interval(1000)
69+
.subAggregation(AggregationBuilders.max("time").field("time")));
70+
datafeedBuilder.setParsedAggregations(aggs);
71+
72+
DatafeedConfig datafeed = datafeedBuilder.build();
73+
74+
// Create stuff and open job
75+
registerJob(jobBuilder);
76+
putJob(jobBuilder);
77+
registerDatafeed(datafeed);
78+
putDatafeed(datafeed);
79+
openJob(jobId);
80+
81+
// Now let's index the data
82+
client().admin().indices().prepareCreate(dataIndex)
83+
.addMapping("type", "time", "type=date")
84+
.get();
85+
86+
// Index a doc per second from a minute ago to a minute later
87+
long now = System.currentTimeMillis();
88+
long aMinuteAgo = now - TimeValue.timeValueMinutes(1).millis();
89+
long aMinuteLater = now + TimeValue.timeValueMinutes(1).millis();
90+
long curTime = aMinuteAgo;
91+
BulkRequestBuilder bulkRequestBuilder = client().prepareBulk();
92+
while (curTime < aMinuteLater) {
93+
IndexRequest indexRequest = new IndexRequest(dataIndex);
94+
indexRequest.source("time", curTime);
95+
bulkRequestBuilder.add(indexRequest);
96+
curTime += TimeValue.timeValueSeconds(1).millis();
97+
}
98+
BulkResponse bulkResponse = bulkRequestBuilder
99+
.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE)
100+
.get();
101+
if (bulkResponse.hasFailures()) {
102+
fail("Failed to index docs: " + bulkResponse.buildFailureMessage());
103+
}
104+
105+
// And start datafeed in real-time mode
106+
startDatafeed(datafeedId, 0L, null);
107+
108+
// Wait until we finalize a bucket after now
109+
assertBusy(() -> {
110+
GetBucketsAction.Request getBucketsRequest = new GetBucketsAction.Request(jobId);
111+
getBucketsRequest.setExcludeInterim(true);
112+
getBucketsRequest.setSort("timestamp");
113+
getBucketsRequest.setDescending(true);
114+
List<Bucket> buckets = getBuckets(getBucketsRequest);
115+
assertThat(buckets.size(), greaterThanOrEqualTo(1));
116+
assertThat(buckets.get(0).getTimestamp().getTime(), greaterThan(now));
117+
}, 30, TimeUnit.SECONDS);
118+
119+
// Wrap up
120+
StopDatafeedAction.Response stopJobResponse = stopDatafeed(datafeedId);
121+
assertTrue(stopJobResponse.isStopped());
122+
assertBusy(() -> {
123+
GetDatafeedsStatsAction.Request request = new GetDatafeedsStatsAction.Request(datafeedId);
124+
GetDatafeedsStatsAction.Response response = client().execute(GetDatafeedsStatsAction.INSTANCE, request).actionGet();
125+
assertThat(response.getResponse().results().get(0).getDatafeedState(), equalTo(DatafeedState.STOPPED));
126+
});
127+
closeJob(jobId);
128+
129+
// Assert we have not dropped any data - final buckets should contain 2 events each
130+
GetBucketsAction.Request getBucketsRequest = new GetBucketsAction.Request(jobId);
131+
getBucketsRequest.setExcludeInterim(true);
132+
List<Bucket> buckets = getBuckets(getBucketsRequest);
133+
for (Bucket bucket : buckets) {
134+
if (bucket.getEventCount() != 2) {
135+
fail("Bucket [" + bucket.getTimestamp().getTime() + "] has [" + bucket.getEventCount() + "] when 2 were expected");
136+
}
137+
}
138+
}
139+
}

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedJob.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -380,7 +380,7 @@ private void run(long start, long end, FlushJobAction.Request flushRequest) thro
380380
}
381381
}
382382

383-
lastEndTimeMs = Math.max(lastEndTimeMs == null ? 0 : lastEndTimeMs, end - 1);
383+
lastEndTimeMs = Math.max(lastEndTimeMs == null ? 0 : lastEndTimeMs, dataExtractor.getEndTime() - 1);
384384
LOGGER.debug("[{}] Complete iterating data extractor [{}], [{}], [{}], [{}], [{}]", jobId, error, recordCount,
385385
lastEndTimeMs, isRunning(), dataExtractor.isCancelled());
386386

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/extractor/aggregation/AbstractAggregationDataExtractor.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -80,6 +80,11 @@ public void cancel() {
8080
hasNext = false;
8181
}
8282

83+
@Override
84+
public long getEndTime() {
85+
return context.end;
86+
}
87+
8388
@Override
8489
public Optional<InputStream> next() throws IOException {
8590
if (!hasNext()) {

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/extractor/chunked/ChunkedDataExtractor.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -114,7 +114,7 @@ private void setUpChunkedSearch() throws IOException {
114114
currentEnd = currentStart;
115115
chunkSpan = context.chunkSpan == null ? dataSummary.estimateChunk() : context.chunkSpan.getMillis();
116116
chunkSpan = context.timeAligner.alignToCeil(chunkSpan);
117-
LOGGER.debug("[{}]Chunked search configured: kind = {}, dataTimeSpread = {} ms, chunk span = {} ms",
117+
LOGGER.debug("[{}] Chunked search configured: kind = {}, dataTimeSpread = {} ms, chunk span = {} ms",
118118
context.jobId, dataSummary.getClass().getSimpleName(), dataSummary.getDataTimeSpread(), chunkSpan);
119119
} else {
120120
// search is over
@@ -170,6 +170,11 @@ public void cancel() {
170170
isCancelled = true;
171171
}
172172

173+
@Override
174+
public long getEndTime() {
175+
return context.end;
176+
}
177+
173178
ChunkedDataExtractorContext getContext() {
174179
return context;
175180
}

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/extractor/scroll/ScrollDataExtractor.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -77,6 +77,11 @@ public void cancel() {
7777
isCancelled = true;
7878
}
7979

80+
@Override
81+
public long getEndTime() {
82+
return context.end;
83+
}
84+
8085
@Override
8186
public Optional<InputStream> next() throws IOException {
8287
if (!hasNext()) {

x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/datafeed/DatafeedJobTests.java

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -370,14 +370,16 @@ public void testExtractionProblem() throws Exception {
370370
verify(client, never()).execute(same(PersistJobAction.INSTANCE), any());
371371
}
372372

373-
public void testPostAnalysisProblem() throws Exception {
373+
public void testPostAnalysisProblem() {
374374
client = mock(Client.class);
375375
ThreadPool threadPool = mock(ThreadPool.class);
376376
when(client.threadPool()).thenReturn(threadPool);
377377
when(threadPool.getThreadContext()).thenReturn(new ThreadContext(Settings.EMPTY));
378378
when(client.execute(same(FlushJobAction.INSTANCE), any())).thenReturn(flushJobFuture);
379379
when(client.execute(same(PostDataAction.INSTANCE), any())).thenThrow(new RuntimeException());
380380

381+
when(dataExtractor.getEndTime()).thenReturn(1000L);
382+
381383
DatafeedJob datafeedJob = createDatafeedJob(1000, 500, -1, -1);
382384
DatafeedJob.AnalysisProblemException analysisProblemException =
383385
expectThrows(DatafeedJob.AnalysisProblemException.class, () -> datafeedJob.runLookBack(0L, 1000L));
@@ -397,14 +399,16 @@ public void testPostAnalysisProblem() throws Exception {
397399
verify(client, never()).execute(same(PersistJobAction.INSTANCE), any());
398400
}
399401

400-
public void testPostAnalysisProblemIsConflict() throws Exception {
402+
public void testPostAnalysisProblemIsConflict() {
401403
client = mock(Client.class);
402404
ThreadPool threadPool = mock(ThreadPool.class);
403405
when(client.threadPool()).thenReturn(threadPool);
404406
when(threadPool.getThreadContext()).thenReturn(new ThreadContext(Settings.EMPTY));
405407
when(client.execute(same(FlushJobAction.INSTANCE), any())).thenReturn(flushJobFuture);
406408
when(client.execute(same(PostDataAction.INSTANCE), any())).thenThrow(ExceptionsHelper.conflictStatusException("conflict"));
407409

410+
when(dataExtractor.getEndTime()).thenReturn(1000L);
411+
408412
DatafeedJob datafeedJob = createDatafeedJob(1000, 500, -1, -1);
409413
DatafeedJob.AnalysisProblemException analysisProblemException =
410414
expectThrows(DatafeedJob.AnalysisProblemException.class, () -> datafeedJob.runLookBack(0L, 1000L));
@@ -424,7 +428,7 @@ public void testPostAnalysisProblemIsConflict() throws Exception {
424428
verify(client, never()).execute(same(PersistJobAction.INSTANCE), any());
425429
}
426430

427-
public void testFlushAnalysisProblem() throws Exception {
431+
public void testFlushAnalysisProblem() {
428432
when(client.execute(same(FlushJobAction.INSTANCE), any())).thenThrow(new RuntimeException());
429433

430434
currentTime = 60000L;
@@ -436,7 +440,7 @@ public void testFlushAnalysisProblem() throws Exception {
436440
assertThat(analysisProblemException.shouldStop, is(false));
437441
}
438442

439-
public void testFlushAnalysisProblemIsConflict() throws Exception {
443+
public void testFlushAnalysisProblemIsConflict() {
440444
when(client.execute(same(FlushJobAction.INSTANCE), any())).thenThrow(ExceptionsHelper.conflictStatusException("conflict"));
441445

442446
currentTime = 60000L;

x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/datafeed/extractor/chunked/ChunkedDataExtractorTests.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -596,5 +596,10 @@ public boolean isCancelled() {
596596
public void cancel() {
597597
// do nothing
598598
}
599+
600+
@Override
601+
public long getEndTime() {
602+
return 0;
603+
}
599604
}
600605
}

0 commit comments

Comments
 (0)