Skip to content

Commit be140cc

Browse files
committed
addressing (#36891)(#36888)(#36889) (#37080)
1 parent ffc62d0 commit be140cc

File tree

2 files changed

+41
-18
lines changed

2 files changed

+41
-18
lines changed

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedJob.java

Lines changed: 16 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@
3232
import org.elasticsearch.xpack.core.ml.job.messages.Messages;
3333
import org.elasticsearch.xpack.core.ml.job.persistence.ElasticsearchMappings;
3434
import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.DataCounts;
35+
import org.elasticsearch.xpack.core.ml.job.results.Bucket;
3536
import org.elasticsearch.xpack.core.security.user.SystemUser;
3637
import org.elasticsearch.xpack.ml.datafeed.delayeddatacheck.DelayedDataDetector;
3738
import org.elasticsearch.xpack.ml.datafeed.delayeddatacheck.DelayedDataDetectorFactory.BucketWithMissingData;
@@ -190,24 +191,27 @@ private void checkForMissingDataIfNecessary() {
190191
long totalRecordsMissing = missingDataBuckets.stream()
191192
.mapToLong(BucketWithMissingData::getMissingDocumentCount)
192193
.sum();
193-
Date endTime = missingDataBuckets.get(missingDataBuckets.size() - 1).getBucket().getTimestamp();
194-
Annotation annotation = createAnnotation(missingDataBuckets.get(0).getBucket().getTimestamp(),
195-
endTime,
196-
totalRecordsMissing);
194+
Bucket lastBucket = missingDataBuckets.get(missingDataBuckets.size() - 1).getBucket();
195+
// Get the end of the last bucket and make it milliseconds
196+
Date endTime = new Date((lastBucket.getEpoch() + lastBucket.getBucketSpan()) * 1000);
197+
198+
String msg = Messages.getMessage(Messages.JOB_AUDIT_DATAFEED_MISSING_DATA, totalRecordsMissing,
199+
XContentElasticsearchExtension.DEFAULT_DATE_PRINTER.print(lastBucket.getTimestamp().getTime()));
200+
201+
Annotation annotation = createAnnotation(missingDataBuckets.get(0).getBucket().getTimestamp(), endTime, msg);
197202

198203
// Have we an annotation that covers the same area with the same message?
199204
// Cannot use annotation.equals(other) as that checks createTime
200205
if (lastDataCheckAnnotation != null
201206
&& annotation.getAnnotation().equals(lastDataCheckAnnotation.getAnnotation())
202207
&& annotation.getTimestamp().equals(lastDataCheckAnnotation.getTimestamp())
203-
&& annotation.getEndTimestamp().equals(lastDataCheckAnnotation.getTimestamp())) {
208+
&& annotation.getEndTimestamp().equals(lastDataCheckAnnotation.getEndTimestamp())) {
204209
return;
205210
}
206211

207212
// Creating a warning in addition to updating/creating our annotation. This allows the issue to be plainly visible
208213
// in the job list page.
209-
auditor.warning(jobId, Messages.getMessage(Messages.JOB_AUDIT_DATAFEED_MISSING_DATA, totalRecordsMissing,
210-
XContentElasticsearchExtension.DEFAULT_DATE_PRINTER.print(endTime.getTime())));
214+
auditor.warning(jobId, msg);
211215

212216
if (lastDataCheckAnnotationId != null) {
213217
updateAnnotation(annotation);
@@ -218,17 +222,16 @@ private void checkForMissingDataIfNecessary() {
218222
}
219223
}
220224

221-
private Annotation createAnnotation(Date startTime, Date endTime, long recordsMissing) {
222-
String msg = Messages.getMessage(Messages.JOB_AUDIT_DATAFEED_MISSING_DATA, recordsMissing,
223-
XContentElasticsearchExtension.DEFAULT_DATE_PRINTER.print(endTime.getTime()));
225+
private Annotation createAnnotation(Date startTime, Date endTime, String msg) {
226+
Date currentTime = new Date(currentTimeSupplier.get());
224227
return new Annotation(msg,
225-
new Date(currentTimeSupplier.get()),
228+
currentTime,
226229
SystemUser.NAME,
227230
startTime,
228231
endTime,
229232
jobId,
230-
null,
231-
null,
233+
currentTime,
234+
SystemUser.NAME,
232235
"annotation");
233236
}
234237

x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/datafeed/DatafeedJobTests.java

Lines changed: 25 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@
4646
import java.io.IOException;
4747
import java.io.InputStream;
4848
import java.nio.charset.StandardCharsets;
49+
import java.util.Arrays;
4950
import java.util.Collections;
5051
import java.util.Date;
5152
import java.util.List;
@@ -228,6 +229,8 @@ public void testRealtimeRun() throws Exception {
228229
flushJobResponse = new FlushJobAction.Response(true, new Date(2000));
229230
Bucket bucket = mock(Bucket.class);
230231
when(bucket.getTimestamp()).thenReturn(new Date(2000));
232+
when(bucket.getEpoch()).thenReturn(2L);
233+
when(bucket.getBucketSpan()).thenReturn(4L);
231234
when(flushJobFuture.actionGet()).thenReturn(flushJobResponse);
232235
when(client.execute(same(FlushJobAction.INSTANCE), flushJobRequests.capture())).thenReturn(flushJobFuture);
233236
when(delayedDataDetector.detectMissingData(2000))
@@ -272,10 +275,10 @@ public void testRealtimeRun() throws Exception {
272275
new Date(currentTime),
273276
SystemUser.NAME,
274277
bucket.getTimestamp(),
275-
bucket.getTimestamp(),
278+
new Date((bucket.getEpoch() + bucket.getBucketSpan()) * 1000),
276279
jobId,
277-
null,
278-
null,
280+
new Date(currentTime),
281+
SystemUser.NAME,
279282
"annotation");
280283

281284
IndexRequest request = new IndexRequest(AnnotationIndex.WRITE_ALIAS_NAME, ElasticsearchMappings.DOC_TYPE);
@@ -288,8 +291,13 @@ public void testRealtimeRun() throws Exception {
288291
assertThat(request.source(), equalTo(indexRequestArgumentCaptor.getValue().source()));
289292

290293
// Execute a fourth time, this time we return a new delayedDataDetector response to verify annotation gets updated
294+
Bucket bucket2 = mock(Bucket.class);
295+
when(bucket2.getTimestamp()).thenReturn(new Date(6000));
296+
when(bucket2.getEpoch()).thenReturn(6L);
297+
when(bucket2.getBucketSpan()).thenReturn(4L);
291298
when(delayedDataDetector.detectMissingData(2000))
292-
.thenReturn(Collections.singletonList(BucketWithMissingData.fromMissingAndBucket(15, bucket)));
299+
.thenReturn(Arrays.asList(BucketWithMissingData.fromMissingAndBucket(10, bucket),
300+
BucketWithMissingData.fromMissingAndBucket(5, bucket2)));
293301
currentTime = currentTime + DatafeedJob.MISSING_DATA_CHECK_INTERVAL_MS + 1;
294302
inputStream = new ByteArrayInputStream(contentBytes);
295303
when(dataExtractor.hasNext()).thenReturn(true).thenReturn(false);
@@ -299,14 +307,15 @@ public void testRealtimeRun() throws Exception {
299307

300308
msg = Messages.getMessage(Messages.JOB_AUDIT_DATAFEED_MISSING_DATA,
301309
15,
302-
XContentElasticsearchExtension.DEFAULT_DATE_PRINTER.print(2000));
310+
XContentElasticsearchExtension.DEFAULT_DATE_PRINTER.print(6000));
303311
// What we expect the updated annotation to be indexed as
304312
IndexRequest indexRequest = new IndexRequest(AnnotationIndex.WRITE_ALIAS_NAME, ElasticsearchMappings.DOC_TYPE);
305313
indexRequest.id(annotationDocId);
306314
Annotation updatedAnnotation = new Annotation(expectedAnnotation);
307315
updatedAnnotation.setAnnotation(msg);
308316
updatedAnnotation.setModifiedTime(new Date(currentTime));
309317
updatedAnnotation.setModifiedUsername(SystemUser.NAME);
318+
updatedAnnotation.setEndTimestamp(new Date((bucket2.getEpoch() + bucket2.getBucketSpan()) * 1000));
310319
try (XContentBuilder xContentBuilder = updatedAnnotation.toXContent(XContentFactory.jsonBuilder(), ToXContent.EMPTY_PARAMS)) {
311320
indexRequest.source(xContentBuilder);
312321
}
@@ -319,6 +328,17 @@ public void testRealtimeRun() throws Exception {
319328
assertThat(indexRequest.source().utf8ToString(),
320329
equalTo(updateRequestArgumentCaptor.getValue().source().utf8ToString()));
321330
assertThat(updateRequestArgumentCaptor.getValue().opType(), equalTo(DocWriteRequest.OpType.INDEX));
331+
332+
// Execute a fifth time, no changes should occur as annotation is the same
333+
currentTime = currentTime + DatafeedJob.MISSING_DATA_CHECK_INTERVAL_MS + 1;
334+
inputStream = new ByteArrayInputStream(contentBytes);
335+
when(dataExtractor.hasNext()).thenReturn(true).thenReturn(false);
336+
when(dataExtractor.next()).thenReturn(Optional.of(inputStream));
337+
when(dataExtractorFactory.newExtractor(anyLong(), anyLong())).thenReturn(dataExtractor);
338+
datafeedJob.runRealtime();
339+
340+
// We should not get 3 index requests for the annotations
341+
verify(client, atMost(2)).index(any());
322342
}
323343

324344
public void testEmptyDataCountGivenlookback() throws Exception {

0 commit comments

Comments
 (0)