Skip to content

Commit cfc3107

Browse files
authored
addressing (#36891)(#36888)(#36889) (#37080)
1 parent b04b317 commit cfc3107

File tree

2 files changed

+41
-18
lines changed

2 files changed

+41
-18
lines changed

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedJob.java

Lines changed: 16 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131
import org.elasticsearch.xpack.core.ml.job.config.DataDescription;
3232
import org.elasticsearch.xpack.core.ml.job.messages.Messages;
3333
import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.DataCounts;
34+
import org.elasticsearch.xpack.core.ml.job.results.Bucket;
3435
import org.elasticsearch.xpack.core.security.user.SystemUser;
3536
import org.elasticsearch.xpack.ml.datafeed.delayeddatacheck.DelayedDataDetector;
3637
import org.elasticsearch.xpack.ml.datafeed.delayeddatacheck.DelayedDataDetectorFactory.BucketWithMissingData;
@@ -189,24 +190,27 @@ private void checkForMissingDataIfNecessary() {
189190
long totalRecordsMissing = missingDataBuckets.stream()
190191
.mapToLong(BucketWithMissingData::getMissingDocumentCount)
191192
.sum();
192-
Date endTime = missingDataBuckets.get(missingDataBuckets.size() - 1).getBucket().getTimestamp();
193-
Annotation annotation = createAnnotation(missingDataBuckets.get(0).getBucket().getTimestamp(),
194-
endTime,
195-
totalRecordsMissing);
193+
Bucket lastBucket = missingDataBuckets.get(missingDataBuckets.size() - 1).getBucket();
194+
// Get the end of the last bucket and make it milliseconds
195+
Date endTime = new Date((lastBucket.getEpoch() + lastBucket.getBucketSpan()) * 1000);
196+
197+
String msg = Messages.getMessage(Messages.JOB_AUDIT_DATAFEED_MISSING_DATA, totalRecordsMissing,
198+
XContentElasticsearchExtension.DEFAULT_DATE_PRINTER.print(lastBucket.getTimestamp().getTime()));
199+
200+
Annotation annotation = createAnnotation(missingDataBuckets.get(0).getBucket().getTimestamp(), endTime, msg);
196201

197202
// Have we an annotation that covers the same area with the same message?
198203
// Cannot use annotation.equals(other) as that checks createTime
199204
if (lastDataCheckAnnotation != null
200205
&& annotation.getAnnotation().equals(lastDataCheckAnnotation.getAnnotation())
201206
&& annotation.getTimestamp().equals(lastDataCheckAnnotation.getTimestamp())
202-
&& annotation.getEndTimestamp().equals(lastDataCheckAnnotation.getTimestamp())) {
207+
&& annotation.getEndTimestamp().equals(lastDataCheckAnnotation.getEndTimestamp())) {
203208
return;
204209
}
205210

206211
// Creating a warning in addition to updating/creating our annotation. This allows the issue to be plainly visible
207212
// in the job list page.
208-
auditor.warning(jobId, Messages.getMessage(Messages.JOB_AUDIT_DATAFEED_MISSING_DATA, totalRecordsMissing,
209-
XContentElasticsearchExtension.DEFAULT_DATE_PRINTER.print(endTime.getTime())));
213+
auditor.warning(jobId, msg);
210214

211215
if (lastDataCheckAnnotationId != null) {
212216
updateAnnotation(annotation);
@@ -217,17 +221,16 @@ private void checkForMissingDataIfNecessary() {
217221
}
218222
}
219223

220-
private Annotation createAnnotation(Date startTime, Date endTime, long recordsMissing) {
221-
String msg = Messages.getMessage(Messages.JOB_AUDIT_DATAFEED_MISSING_DATA, recordsMissing,
222-
XContentElasticsearchExtension.DEFAULT_DATE_PRINTER.print(endTime.getTime()));
224+
private Annotation createAnnotation(Date startTime, Date endTime, String msg) {
225+
Date currentTime = new Date(currentTimeSupplier.get());
223226
return new Annotation(msg,
224-
new Date(currentTimeSupplier.get()),
227+
currentTime,
225228
SystemUser.NAME,
226229
startTime,
227230
endTime,
228231
jobId,
229-
null,
230-
null,
232+
currentTime,
233+
SystemUser.NAME,
231234
"annotation");
232235
}
233236

x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/datafeed/DatafeedJobTests.java

Lines changed: 25 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@
4545
import java.io.IOException;
4646
import java.io.InputStream;
4747
import java.nio.charset.StandardCharsets;
48+
import java.util.Arrays;
4849
import java.util.Collections;
4950
import java.util.Date;
5051
import java.util.List;
@@ -226,6 +227,8 @@ public void testRealtimeRun() throws Exception {
226227
flushJobResponse = new FlushJobAction.Response(true, new Date(2000));
227228
Bucket bucket = mock(Bucket.class);
228229
when(bucket.getTimestamp()).thenReturn(new Date(2000));
230+
when(bucket.getEpoch()).thenReturn(2L);
231+
when(bucket.getBucketSpan()).thenReturn(4L);
229232
when(flushJobFuture.actionGet()).thenReturn(flushJobResponse);
230233
when(client.execute(same(FlushJobAction.INSTANCE), flushJobRequests.capture())).thenReturn(flushJobFuture);
231234
when(delayedDataDetector.detectMissingData(2000))
@@ -270,10 +273,10 @@ public void testRealtimeRun() throws Exception {
270273
new Date(currentTime),
271274
SystemUser.NAME,
272275
bucket.getTimestamp(),
273-
bucket.getTimestamp(),
276+
new Date((bucket.getEpoch() + bucket.getBucketSpan()) * 1000),
274277
jobId,
275-
null,
276-
null,
278+
new Date(currentTime),
279+
SystemUser.NAME,
277280
"annotation");
278281

279282
IndexRequest request = new IndexRequest(AnnotationIndex.WRITE_ALIAS_NAME);
@@ -286,8 +289,13 @@ public void testRealtimeRun() throws Exception {
286289
assertThat(request.source(), equalTo(indexRequestArgumentCaptor.getValue().source()));
287290

288291
// Execute a fourth time, this time we return a new delayedDataDetector response to verify annotation gets updated
292+
Bucket bucket2 = mock(Bucket.class);
293+
when(bucket2.getTimestamp()).thenReturn(new Date(6000));
294+
when(bucket2.getEpoch()).thenReturn(6L);
295+
when(bucket2.getBucketSpan()).thenReturn(4L);
289296
when(delayedDataDetector.detectMissingData(2000))
290-
.thenReturn(Collections.singletonList(BucketWithMissingData.fromMissingAndBucket(15, bucket)));
297+
.thenReturn(Arrays.asList(BucketWithMissingData.fromMissingAndBucket(10, bucket),
298+
BucketWithMissingData.fromMissingAndBucket(5, bucket2)));
291299
currentTime = currentTime + DatafeedJob.MISSING_DATA_CHECK_INTERVAL_MS + 1;
292300
inputStream = new ByteArrayInputStream(contentBytes);
293301
when(dataExtractor.hasNext()).thenReturn(true).thenReturn(false);
@@ -297,14 +305,15 @@ public void testRealtimeRun() throws Exception {
297305

298306
msg = Messages.getMessage(Messages.JOB_AUDIT_DATAFEED_MISSING_DATA,
299307
15,
300-
XContentElasticsearchExtension.DEFAULT_DATE_PRINTER.print(2000));
308+
XContentElasticsearchExtension.DEFAULT_DATE_PRINTER.print(6000));
301309
// What we expect the updated annotation to be indexed as
302310
IndexRequest indexRequest = new IndexRequest(AnnotationIndex.WRITE_ALIAS_NAME);
303311
indexRequest.id(annotationDocId);
304312
Annotation updatedAnnotation = new Annotation(expectedAnnotation);
305313
updatedAnnotation.setAnnotation(msg);
306314
updatedAnnotation.setModifiedTime(new Date(currentTime));
307315
updatedAnnotation.setModifiedUsername(SystemUser.NAME);
316+
updatedAnnotation.setEndTimestamp(new Date((bucket2.getEpoch() + bucket2.getBucketSpan()) * 1000));
308317
try (XContentBuilder xContentBuilder = updatedAnnotation.toXContent(XContentFactory.jsonBuilder(), ToXContent.EMPTY_PARAMS)) {
309318
indexRequest.source(xContentBuilder);
310319
}
@@ -317,6 +326,17 @@ public void testRealtimeRun() throws Exception {
317326
assertThat(indexRequest.source().utf8ToString(),
318327
equalTo(updateRequestArgumentCaptor.getValue().source().utf8ToString()));
319328
assertThat(updateRequestArgumentCaptor.getValue().opType(), equalTo(DocWriteRequest.OpType.INDEX));
329+
330+
// Execute a fifth time, no changes should occur as annotation is the same
331+
currentTime = currentTime + DatafeedJob.MISSING_DATA_CHECK_INTERVAL_MS + 1;
332+
inputStream = new ByteArrayInputStream(contentBytes);
333+
when(dataExtractor.hasNext()).thenReturn(true).thenReturn(false);
334+
when(dataExtractor.next()).thenReturn(Optional.of(inputStream));
335+
when(dataExtractorFactory.newExtractor(anyLong(), anyLong())).thenReturn(dataExtractor);
336+
datafeedJob.runRealtime();
337+
338+
// We should not get 3 index requests for the annotations
339+
verify(client, atMost(2)).index(any());
320340
}
321341

322342
public void testEmptyDataCountGivenlookback() throws Exception {

0 commit comments

Comments
 (0)