Skip to content

Commit ea6cfb7

Browse files
authored
[7.x] Make Annotation a result type (#56342) (#57508)
1 parent 9bc9d01 commit ea6cfb7

File tree

8 files changed

+155
-72
lines changed

8 files changed

+155
-72
lines changed

x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/annotations/Annotation.java

+7-1
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,11 @@ public String toString() {
5454
}
5555
}
5656

57+
/**
58+
* Result type is needed due to the fact that {@link Annotation} can be returned from C++ as an ML result.
59+
*/
60+
public static final ParseField RESULTS_FIELD = new ParseField("annotation");
61+
5762
public static final ParseField ANNOTATION = new ParseField("annotation");
5863
public static final ParseField CREATE_TIME = new ParseField("create_time");
5964
public static final ParseField CREATE_USERNAME = new ParseField("create_username");
@@ -81,7 +86,8 @@ public static Annotation fromXContent(XContentParser parser, Void context) {
8186
/**
8287
* Strict parser for cases when {@link Annotation} is returned from C++ as an ML result.
8388
*/
84-
private static final ObjectParser<Builder, Void> STRICT_PARSER = new ObjectParser<>(ANNOTATION.getPreferredName(), false, Builder::new);
89+
private static final ObjectParser<Builder, Void> STRICT_PARSER =
90+
new ObjectParser<>(RESULTS_FIELD.getPreferredName(), false, Builder::new);
8591

8692
static {
8793
STRICT_PARSER.declareString(Builder::setAnnotation, ANNOTATION);

x-pack/plugin/ml/src/internalClusterTest/java/org/elasticsearch/xpack/ml/integration/AutodetectResultProcessorIT.java

+41-18
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@
3636
import org.elasticsearch.xpack.core.ml.action.PutJobAction;
3737
import org.elasticsearch.xpack.core.ml.annotations.Annotation;
3838
import org.elasticsearch.xpack.core.ml.annotations.AnnotationIndex;
39+
import org.elasticsearch.xpack.core.ml.annotations.AnnotationTests;
3940
import org.elasticsearch.xpack.core.ml.job.config.AnalysisConfig;
4041
import org.elasticsearch.xpack.core.ml.job.config.DataDescription;
4142
import org.elasticsearch.xpack.core.ml.job.config.Detector;
@@ -89,13 +90,18 @@
8990
import java.util.concurrent.CountDownLatch;
9091
import java.util.concurrent.atomic.AtomicReference;
9192

93+
import static java.util.stream.Collectors.toList;
9294
import static org.elasticsearch.common.xcontent.json.JsonXContent.jsonXContent;
9395
import static org.hamcrest.Matchers.closeTo;
9496
import static org.hamcrest.Matchers.contains;
97+
import static org.hamcrest.Matchers.containsInAnyOrder;
9598
import static org.hamcrest.Matchers.empty;
9699
import static org.hamcrest.Matchers.equalTo;
100+
import static org.hamcrest.Matchers.everyItem;
97101
import static org.hamcrest.Matchers.hasSize;
98102
import static org.hamcrest.Matchers.is;
103+
import static org.hamcrest.Matchers.not;
104+
import static org.hamcrest.Matchers.startsWith;
99105
import static org.mockito.Mockito.mock;
100106
import static org.mockito.Mockito.never;
101107
import static org.mockito.Mockito.verify;
@@ -168,7 +174,9 @@ public void deleteJob() throws Exception {
168174
AcknowledgedResponse response = client().execute(DeleteJobAction.INSTANCE, request).actionGet();
169175
assertTrue(response.isAcknowledged());
170176
// Verify that deleting job also deletes associated model snapshots annotations
171-
assertThat(getAnnotations(), empty());
177+
assertThat(
178+
getAnnotations().stream().map(Annotation::getAnnotation).collect(toList()),
179+
everyItem(not(startsWith("Job model snapshot"))));
172180
}
173181

174182
public void testProcessResults() throws Exception {
@@ -183,6 +191,8 @@ public void testProcessResults() throws Exception {
183191
resultsBuilder.addCategoryDefinition(categoryDefinition);
184192
ModelPlot modelPlot = createModelPlot();
185193
resultsBuilder.addModelPlot(modelPlot);
194+
Annotation annotation = createAnnotation();
195+
resultsBuilder.addAnnotation(annotation);
186196
ModelSizeStats modelSizeStats = createModelSizeStats();
187197
resultsBuilder.addModelSizeStats(modelSizeStats);
188198
ModelSnapshot modelSnapshot = createModelSnapshot();
@@ -224,17 +234,20 @@ public void testProcessResults() throws Exception {
224234
assertEquals(modelSnapshot, persistedModelSnapshot.results().get(0));
225235
assertEquals(Collections.singletonList(modelSnapshot), capturedUpdateModelSnapshotOnJobRequests);
226236

227-
// Verify that creating model snapshot also creates associated annotation
228-
List<Annotation> annotations = getAnnotations();
229-
assertThat(annotations, hasSize(1));
230-
assertThat(
231-
annotations.get(0).getAnnotation(),
232-
is(equalTo(
233-
new ParameterizedMessage("Job model snapshot with id [{}] stored", modelSnapshot.getSnapshotId()).getFormattedMessage())));
234-
235237
Optional<Quantiles> persistedQuantiles = getQuantiles();
236238
assertTrue(persistedQuantiles.isPresent());
237239
assertEquals(quantiles, persistedQuantiles.get());
240+
241+
// Verify that there are two annotations:
242+
// 1. one related to creating model snapshot
243+
// 2. one for {@link Annotation} result
244+
List<Annotation> annotations = getAnnotations();
245+
assertThat("Annotations were: " + annotations.toString(), annotations, hasSize(2));
246+
assertThat(
247+
annotations.stream().map(Annotation::getAnnotation).collect(toList()),
248+
containsInAnyOrder(
249+
new ParameterizedMessage("Job model snapshot with id [{}] stored", modelSnapshot.getSnapshotId()).getFormattedMessage(),
250+
annotation.getAnnotation()));
238251
}
239252

240253
public void testProcessResults_ModelSnapshot() throws Exception {
@@ -466,6 +479,10 @@ private static ModelPlot createModelPlot() {
466479
return new ModelPlotTests().createTestInstance(JOB_ID);
467480
}
468481

482+
private static Annotation createAnnotation() {
483+
return AnnotationTests.randomAnnotation(JOB_ID);
484+
}
485+
469486
private static ModelSizeStats createModelSizeStats() {
470487
ModelSizeStats.Builder builder = new ModelSizeStats.Builder(JOB_ID);
471488
builder.setTimestamp(randomDate());
@@ -500,47 +517,53 @@ private static class ResultsBuilder {
500517
private final List<AutodetectResult> results = new ArrayList<>();
501518

502519
ResultsBuilder addBucket(Bucket bucket) {
503-
results.add(new AutodetectResult(Objects.requireNonNull(bucket), null, null, null, null, null, null, null, null, null, null));
520+
results.add(
521+
new AutodetectResult(Objects.requireNonNull(bucket), null, null, null, null, null, null, null, null, null, null, null));
504522
return this;
505523
}
506524

507525
ResultsBuilder addRecords(List<AnomalyRecord> records) {
508-
results.add(new AutodetectResult(null, records, null, null, null, null, null, null, null, null, null));
526+
results.add(new AutodetectResult(null, records, null, null, null, null, null, null, null, null, null, null));
509527
return this;
510528
}
511529

512530
ResultsBuilder addInfluencers(List<Influencer> influencers) {
513-
results.add(new AutodetectResult(null, null, influencers, null, null, null, null, null, null, null, null));
531+
results.add(new AutodetectResult(null, null, influencers, null, null, null, null, null, null, null, null, null));
514532
return this;
515533
}
516534

517535
ResultsBuilder addCategoryDefinition(CategoryDefinition categoryDefinition) {
518-
results.add(new AutodetectResult(null, null, null, null, null, null, null, null, null, categoryDefinition, null));
536+
results.add(new AutodetectResult(null, null, null, null, null, null, null, null, null, null, categoryDefinition, null));
519537
return this;
520538
}
521539

522540
ResultsBuilder addModelPlot(ModelPlot modelPlot) {
523-
results.add(new AutodetectResult(null, null, null, null, null, null, modelPlot, null, null, null, null));
541+
results.add(new AutodetectResult(null, null, null, null, null, null, modelPlot, null, null, null, null, null));
542+
return this;
543+
}
544+
545+
ResultsBuilder addAnnotation(Annotation annotation) {
546+
results.add(new AutodetectResult(null, null, null, null, null, null, null, annotation, null, null, null, null));
524547
return this;
525548
}
526549

527550
ResultsBuilder addModelSizeStats(ModelSizeStats modelSizeStats) {
528-
results.add(new AutodetectResult(null, null, null, null, null, modelSizeStats, null, null, null, null, null));
551+
results.add(new AutodetectResult(null, null, null, null, null, modelSizeStats, null, null, null, null, null, null));
529552
return this;
530553
}
531554

532555
ResultsBuilder addModelSnapshot(ModelSnapshot modelSnapshot) {
533-
results.add(new AutodetectResult(null, null, null, null, modelSnapshot, null, null, null, null, null, null));
556+
results.add(new AutodetectResult(null, null, null, null, modelSnapshot, null, null, null, null, null, null, null));
534557
return this;
535558
}
536559

537560
ResultsBuilder addQuantiles(Quantiles quantiles) {
538-
results.add(new AutodetectResult(null, null, null, quantiles, null, null, null, null, null, null, null));
561+
results.add(new AutodetectResult(null, null, null, quantiles, null, null, null, null, null, null, null, null));
539562
return this;
540563
}
541564

542565
ResultsBuilder addFlushAcknowledgement(FlushAcknowledgement flushAcknowledgement) {
543-
results.add(new AutodetectResult(null, null, null, null, null, null, null, null, null, null, flushAcknowledgement));
566+
results.add(new AutodetectResult(null, null, null, null, null, null, null, null, null, null, null, flushAcknowledgement));
544567
return this;
545568
}
546569

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/BlackHoleAutodetectProcess.java

+4-3
Original file line numberDiff line numberDiff line change
@@ -71,7 +71,7 @@ public void writeRecord(String[] record) {
7171
if (Arrays.asList(record).contains(MAGIC_FAILURE_VALUE)) {
7272
open = false;
7373
onProcessCrash.accept("simulated failure");
74-
AutodetectResult result = new AutodetectResult(null, null, null, null, null, null, null, null, null, null, null);
74+
AutodetectResult result = new AutodetectResult(null, null, null, null, null, null, null, null, null, null, null, null);
7575
results.add(result);
7676
}
7777
}
@@ -104,7 +104,8 @@ public void writeUpdateScheduledEventsMessage(List<ScheduledEvent> events, TimeV
104104
@Override
105105
public String flushJob(FlushJobParams params) throws IOException {
106106
FlushAcknowledgement flushAcknowledgement = new FlushAcknowledgement(FLUSH_ID, null);
107-
AutodetectResult result = new AutodetectResult(null, null, null, null, null, null, null, null, null, null, flushAcknowledgement);
107+
AutodetectResult result =
108+
new AutodetectResult(null, null, null, null, null, null, null, null, null, null, null,flushAcknowledgement);
108109
results.add(result);
109110
return FLUSH_ID;
110111
}
@@ -121,7 +122,7 @@ public void flushStream() {
121122
public void close() throws IOException {
122123
if (open) {
123124
Quantiles quantiles = new Quantiles(jobId, new Date(), "black hole quantiles");
124-
AutodetectResult result = new AutodetectResult(null, null, null, quantiles, null, null, null, null, null, null, null);
125+
AutodetectResult result = new AutodetectResult(null, null, null, quantiles, null, null, null, null, null, null, null, null);
125126
results.add(result);
126127
open = false;
127128
}

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/output/AutodetectResultProcessor.java

+10-3
Original file line numberDiff line numberDiff line change
@@ -88,7 +88,6 @@ public class AutodetectResultProcessor {
8888
private final String jobId;
8989
private final Renormalizer renormalizer;
9090
private final JobResultsPersister persister;
91-
private final AnnotationPersister annotationPersister;
9291
private final AutodetectProcess process;
9392
private final TimingStatsReporter timingStatsReporter;
9493
private final Clock clock;
@@ -102,6 +101,7 @@ public class AutodetectResultProcessor {
102101
private final long priorRunsBucketCount;
103102
private long currentRunBucketCount; // only used from the process() thread, so doesn't need to be volatile
104103
private final JobResultsPersister.Builder bulkResultsPersister;
104+
private final AnnotationPersister.Builder bulkAnnotationsPersister;
105105
private boolean deleteInterimRequired;
106106

107107
/**
@@ -131,11 +131,11 @@ public AutodetectResultProcessor(Client client,
131131
this.jobId = Objects.requireNonNull(jobId);
132132
this.renormalizer = Objects.requireNonNull(renormalizer);
133133
this.persister = Objects.requireNonNull(persister);
134-
this.annotationPersister = Objects.requireNonNull(annotationPersister);
135134
this.process = Objects.requireNonNull(autodetectProcess);
136135
this.flushListener = Objects.requireNonNull(flushListener);
137136
this.latestModelSizeStats = Objects.requireNonNull(latestModelSizeStats);
138137
this.bulkResultsPersister = persister.bulkPersisterBuilder(jobId).shouldRetry(this::isAlive);
138+
this.bulkAnnotationsPersister = annotationPersister.bulkPersisterBuilder(jobId).shouldRetry(this::isAlive);
139139
this.timingStatsReporter = new TimingStatsReporter(timingStats, bulkResultsPersister);
140140
this.clock = Objects.requireNonNull(clock);
141141
this.deleteInterimRequired = true;
@@ -155,6 +155,7 @@ public void process() {
155155
if (processKilled == false) {
156156
timingStatsReporter.finishReporting();
157157
bulkResultsPersister.executeRequest();
158+
bulkAnnotationsPersister.executeRequest();
158159
}
159160
} catch (Exception e) {
160161
LOGGER.warn(new ParameterizedMessage("[{}] Error persisting autodetect results", jobId), e);
@@ -253,6 +254,7 @@ void processResult(AutodetectResult result) {
253254
// results are also interim
254255
timingStatsReporter.reportBucket(bucket);
255256
bulkResultsPersister.persistBucket(bucket).executeRequest();
257+
bulkAnnotationsPersister.executeRequest();
256258
++currentRunBucketCount;
257259
}
258260
List<AnomalyRecord> records = result.getRecords();
@@ -271,6 +273,10 @@ void processResult(AutodetectResult result) {
271273
if (modelPlot != null) {
272274
bulkResultsPersister.persistModelPlot(modelPlot);
273275
}
276+
Annotation annotation = result.getAnnotation();
277+
if (annotation != null) {
278+
bulkAnnotationsPersister.persistAnnotation(annotation);
279+
}
274280
Forecast forecast = result.getForecast();
275281
if (forecast != null) {
276282
bulkResultsPersister.persistForecast(forecast);
@@ -313,7 +319,7 @@ void processResult(AutodetectResult result) {
313319
if (indexResponse.getResult() == DocWriteResponse.Result.CREATED) {
314320
updateModelSnapshotOnJob(modelSnapshot);
315321
}
316-
annotationPersister.persistAnnotation(
322+
bulkAnnotationsPersister.persistAnnotation(
317323
ModelSnapshot.annotationDocumentId(modelSnapshot), createModelSnapshotAnnotation(modelSnapshot));
318324
}
319325
Quantiles quantiles = result.getQuantiles();
@@ -338,6 +344,7 @@ void processResult(AutodetectResult result) {
338344
Exception exception = null;
339345
try {
340346
bulkResultsPersister.executeRequest();
347+
bulkAnnotationsPersister.executeRequest();
341348
persister.commitResultWrites(jobId);
342349
LOGGER.debug("[{}] Flush acknowledgement sent to listener for ID {}", jobId, flushAcknowledgement.getId());
343350
} catch (Exception e) {

0 commit comments

Comments
 (0)