Skip to content

Commit 4a3ca41

Browse files
authored
[ML] Switch poor categorization audit warning to use status field (#52195)
In #51146 a rudimentary check for poor categorization was added to 7.6. This change replaces that warning based on a Java-side check with a new one based on the categorization_status field that the ML C++ sets. categorization_status was added in 7.7 and above by #51879, so this new warning based on more advanced conditions will also be in 7.7 and above. Closes #50749
1 parent 74ae1d6 commit 4a3ca41

File tree

3 files changed

+57
-90
lines changed

3 files changed

+57
-90
lines changed

x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/messages/Messages.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -135,7 +135,7 @@ public final class Messages {
135135
"Adjust the analysis_limits.model_memory_limit setting to ensure all data is analyzed";
136136
public static final String JOB_AUDIT_MEMORY_STATUS_HARD_LIMIT_PRE_7_2 = "Job memory status changed to hard_limit at {0}; adjust the " +
137137
"analysis_limits.model_memory_limit setting to ensure all data is analyzed";
138-
public static final String JOB_AUDIT_EXCESSIVE_EARLY_CATEGORIES = "{0} categories observed in the first [{1}] buckets." +
138+
public static final String JOB_AUDIT_CATEGORIZATION_STATUS_WARN = "categorization_status changed to [{0}] after [{1}] buckets." +
139139
" This suggests an inappropriate categorization_field_name has been chosen.";
140140

141141
public static final String JOB_CONFIG_CATEGORIZATION_FILTERS_CONTAINS_DUPLICATES = "categorization_filters contain duplicates";

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/output/AutodetectResultProcessor.java

+14-22
Original file line numberDiff line numberDiff line change
@@ -74,9 +74,6 @@ public class AutodetectResultProcessor {
7474

7575
private static final Logger LOGGER = LogManager.getLogger(AutodetectResultProcessor.class);
7676

77-
static final long EARLY_BUCKET_THRESHOLD = 100;
78-
static final int EXCESSIVE_EARLY_CATEGORY_COUNT = 1000;
79-
8077
private final Client client;
8178
private final AnomalyDetectionAuditor auditor;
8279
private final String jobId;
@@ -90,9 +87,8 @@ public class AutodetectResultProcessor {
9087
private final FlushListener flushListener;
9188
private volatile boolean processKilled;
9289
private volatile boolean failed;
93-
private long priorRunsBucketCount;
90+
private final long priorRunsBucketCount;
9491
private long currentRunBucketCount; // only used from the process() thread, so doesn't need to be volatile
95-
private boolean excessiveCategoryWarningIssued; // only used from the process() thread, so doesn't need to be volatile
9692
private final JobResultsPersister.Builder bulkResultsPersister;
9793
private boolean deleteInterimRequired;
9894

@@ -230,7 +226,7 @@ void processResult(AutodetectResult result) {
230226
}
231227
CategoryDefinition categoryDefinition = result.getCategoryDefinition();
232228
if (categoryDefinition != null) {
233-
processCategoryDefinition(categoryDefinition);
229+
persister.persistCategoryDefinition(categoryDefinition, this::isAlive);
234230
}
235231
ModelPlot modelPlot = result.getModelPlot();
236232
if (modelPlot != null) {
@@ -314,22 +310,6 @@ void processResult(AutodetectResult result) {
314310
}
315311
}
316312

317-
private void processCategoryDefinition(CategoryDefinition categoryDefinition) {
318-
persister.persistCategoryDefinition(categoryDefinition, this::isAlive);
319-
if (categoryDefinition.getCategoryId() == EXCESSIVE_EARLY_CATEGORY_COUNT &&
320-
priorRunsBucketCount + currentRunBucketCount < EARLY_BUCKET_THRESHOLD &&
321-
excessiveCategoryWarningIssued == false) {
322-
auditor.warning(jobId, Messages.getMessage(Messages.JOB_AUDIT_EXCESSIVE_EARLY_CATEGORIES, EXCESSIVE_EARLY_CATEGORY_COUNT,
323-
// Add 1 because category definitions are written before buckets
324-
1L + priorRunsBucketCount + currentRunBucketCount));
325-
// This flag won't be retained if the job is closed and reopened, or if the job migrates to another node.
326-
// This means it's possible the audit message is generated multiple times. However, that's not a
327-
// disaster, and is also very unlikely in the the (best practice) cases where initial lookback covers
328-
// more than 100 buckets.
329-
excessiveCategoryWarningIssued = true;
330-
}
331-
}
332-
333313
private void processModelSizeStats(ModelSizeStats modelSizeStats) {
334314
LOGGER.trace("[{}] Parsed ModelSizeStats: {} / {} / {} / {} / {} / {}",
335315
jobId, modelSizeStats.getModelBytes(), modelSizeStats.getTotalByFieldCount(),
@@ -338,6 +318,8 @@ private void processModelSizeStats(ModelSizeStats modelSizeStats) {
338318

339319
persister.persistModelSizeStats(modelSizeStats, this::isAlive);
340320
notifyModelMemoryStatusChange(modelSizeStats);
321+
notifyCategorizationStatusChange(modelSizeStats);
322+
341323
latestModelSizeStats = modelSizeStats;
342324
}
343325

@@ -359,6 +341,16 @@ private void notifyModelMemoryStatusChange(ModelSizeStats modelSizeStats) {
359341
}
360342
}
361343

344+
private void notifyCategorizationStatusChange(ModelSizeStats modelSizeStats) {
345+
ModelSizeStats.CategorizationStatus categorizationStatus = modelSizeStats.getCategorizationStatus();
346+
if (categorizationStatus != latestModelSizeStats.getCategorizationStatus()) {
347+
if (categorizationStatus == ModelSizeStats.CategorizationStatus.WARN) {
348+
auditor.warning(jobId, Messages.getMessage(Messages.JOB_AUDIT_CATEGORIZATION_STATUS_WARN, categorizationStatus,
349+
priorRunsBucketCount + currentRunBucketCount));
350+
}
351+
}
352+
}
353+
362354
protected void updateModelSnapshotOnJob(ModelSnapshot modelSnapshot) {
363355
JobUpdate update = new JobUpdate.Builder(jobId).setModelSnapshotId(modelSnapshot.getSnapshotId()).build();
364356
UpdateJobAction.Request updateRequest = UpdateJobAction.Request.internal(jobId, update);

x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/output/AutodetectResultProcessorTests.java

+42-67
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,6 @@
1717
import org.elasticsearch.common.settings.Settings;
1818
import org.elasticsearch.common.unit.ByteSizeUnit;
1919
import org.elasticsearch.common.unit.ByteSizeValue;
20-
import org.elasticsearch.common.unit.TimeValue;
2120
import org.elasticsearch.common.util.concurrent.ThreadContext;
2221
import org.elasticsearch.index.shard.ShardId;
2322
import org.elasticsearch.test.ESTestCase;
@@ -52,14 +51,12 @@
5251
import java.util.Iterator;
5352
import java.util.List;
5453
import java.util.concurrent.ScheduledThreadPoolExecutor;
55-
import java.util.concurrent.TimeUnit;
5654
import java.util.concurrent.TimeoutException;
5755

5856
import static org.hamcrest.Matchers.equalTo;
5957
import static org.hamcrest.Matchers.is;
6058
import static org.hamcrest.Matchers.nullValue;
6159
import static org.mockito.Matchers.any;
62-
import static org.mockito.Matchers.anyString;
6360
import static org.mockito.Matchers.eq;
6461
import static org.mockito.Matchers.same;
6562
import static org.mockito.Mockito.doThrow;
@@ -213,65 +210,6 @@ public void testProcessResult_categoryDefinition() {
213210
verify(persister).bulkPersisterBuilder(eq(JOB_ID), any());
214211
}
215212

216-
public void testProcessResult_excessiveCategoryDefinitionCountEarly() {
217-
int iterations = 3;
218-
int categoryCount = AutodetectResultProcessor.EXCESSIVE_EARLY_CATEGORY_COUNT * 2;
219-
220-
processorUnderTest.setDeleteInterimRequired(false);
221-
222-
AutodetectResult result = mock(AutodetectResult.class);
223-
for (int iteration = 1; iteration <= iterations; ++iteration) {
224-
for (int categoryId = 1; categoryId <= categoryCount; ++categoryId) {
225-
CategoryDefinition categoryDefinition = new CategoryDefinition(JOB_ID);
226-
categoryDefinition.setCategoryId(categoryId);
227-
when(result.getCategoryDefinition()).thenReturn(categoryDefinition);
228-
229-
processorUnderTest.processResult(result);
230-
}
231-
}
232-
233-
verify(bulkBuilder, never()).executeRequest();
234-
verify(persister, times(iterations * categoryCount)).persistCategoryDefinition(any(CategoryDefinition.class), any());
235-
verify(persister).bulkPersisterBuilder(eq(JOB_ID), any());
236-
verify(auditor).warning(eq(JOB_ID), eq(Messages.getMessage(Messages.JOB_AUDIT_EXCESSIVE_EARLY_CATEGORIES,
237-
AutodetectResultProcessor.EXCESSIVE_EARLY_CATEGORY_COUNT, 1)));
238-
}
239-
240-
public void testProcessResult_highCategoryDefinitionCountLateOn() {
241-
int iterations = 3;
242-
int categoryCount = AutodetectResultProcessor.EXCESSIVE_EARLY_CATEGORY_COUNT * 2;
243-
244-
processorUnderTest.setDeleteInterimRequired(false);
245-
246-
when(bulkBuilder.persistTimingStats(any(TimingStats.class))).thenReturn(bulkBuilder);
247-
when(bulkBuilder.persistBucket(any(Bucket.class))).thenReturn(bulkBuilder);
248-
249-
AutodetectResult bucketResult = mock(AutodetectResult.class);
250-
final int numPriorBuckets = (int) AutodetectResultProcessor.EARLY_BUCKET_THRESHOLD + 1;
251-
for (int i = 0; i < numPriorBuckets; ++i) {
252-
Bucket bucket = new Bucket(JOB_ID, new Date(i * 1000 + 1000000), BUCKET_SPAN_MS);
253-
when(bucketResult.getBucket()).thenReturn(bucket);
254-
processorUnderTest.processResult(bucketResult);
255-
}
256-
257-
AutodetectResult categoryResult = mock(AutodetectResult.class);
258-
for (int iteration = 1; iteration <= iterations; ++iteration) {
259-
for (int categoryId = 1; categoryId <= categoryCount; ++categoryId) {
260-
CategoryDefinition categoryDefinition = new CategoryDefinition(JOB_ID);
261-
categoryDefinition.setCategoryId(categoryId);
262-
when(categoryResult.getCategoryDefinition()).thenReturn(categoryDefinition);
263-
processorUnderTest.processResult(categoryResult);
264-
}
265-
}
266-
267-
verify(bulkBuilder).persistTimingStats(any(TimingStats.class));
268-
verify(bulkBuilder, times(numPriorBuckets)).persistBucket(any(Bucket.class));
269-
verify(bulkBuilder, times(numPriorBuckets)).executeRequest();
270-
verify(persister, times(iterations * categoryCount)).persistCategoryDefinition(any(CategoryDefinition.class), any());
271-
verify(persister).bulkPersisterBuilder(eq(JOB_ID), any());
272-
verify(auditor, never()).warning(eq(JOB_ID), anyString());
273-
}
274-
275213
public void testProcessResult_flushAcknowledgement() {
276214
AutodetectResult result = mock(AutodetectResult.class);
277215
FlushAcknowledgement flushAcknowledgement = mock(FlushAcknowledgement.class);
@@ -335,11 +273,6 @@ public void testProcessResult_modelSizeStats() {
335273
}
336274

337275
public void testProcessResult_modelSizeStatsWithMemoryStatusChanges() {
338-
TimeValue delay = TimeValue.timeValueSeconds(5);
339-
// Set up schedule delay time
340-
when(threadPool.schedule(any(Runnable.class), any(TimeValue.class), anyString()))
341-
.thenAnswer(i -> executor.schedule((Runnable) i.getArguments()[0], delay.nanos(), TimeUnit.NANOSECONDS));
342-
343276
AutodetectResult result = mock(AutodetectResult.class);
344277
processorUnderTest.setDeleteInterimRequired(false);
345278

@@ -374,6 +307,48 @@ public void testProcessResult_modelSizeStatsWithMemoryStatusChanges() {
374307
verify(auditor).error(JOB_ID, Messages.getMessage(Messages.JOB_AUDIT_MEMORY_STATUS_HARD_LIMIT, "512mb", "1kb"));
375308
}
376309

310+
public void testProcessResult_modelSizeStatsWithCategorizationStatusChanges() {
311+
AutodetectResult result = mock(AutodetectResult.class);
312+
processorUnderTest.setDeleteInterimRequired(false);
313+
314+
// First one with ok
315+
ModelSizeStats modelSizeStats =
316+
new ModelSizeStats.Builder(JOB_ID).setCategorizationStatus(ModelSizeStats.CategorizationStatus.OK).build();
317+
when(result.getModelSizeStats()).thenReturn(modelSizeStats);
318+
processorUnderTest.processResult(result);
319+
320+
// Now one with warn
321+
modelSizeStats = new ModelSizeStats.Builder(JOB_ID).setCategorizationStatus(ModelSizeStats.CategorizationStatus.WARN).build();
322+
when(result.getModelSizeStats()).thenReturn(modelSizeStats);
323+
processorUnderTest.processResult(result);
324+
325+
// Another with warn
326+
modelSizeStats = new ModelSizeStats.Builder(JOB_ID).setCategorizationStatus(ModelSizeStats.CategorizationStatus.WARN).build();
327+
when(result.getModelSizeStats()).thenReturn(modelSizeStats);
328+
processorUnderTest.processResult(result);
329+
330+
verify(persister).bulkPersisterBuilder(eq(JOB_ID), any());
331+
verify(persister, times(3)).persistModelSizeStats(any(ModelSizeStats.class), any());
332+
// We should have only fired one notification; only the change from ok to warn should have fired, not the subsequent warn
333+
verify(auditor).warning(JOB_ID, Messages.getMessage(Messages.JOB_AUDIT_CATEGORIZATION_STATUS_WARN, "warn", 0));
334+
}
335+
336+
public void testProcessResult_modelSizeStatsWithFirstCategorizationStatusWarn() {
337+
AutodetectResult result = mock(AutodetectResult.class);
338+
processorUnderTest.setDeleteInterimRequired(false);
339+
340+
// First one with warn - this works because a default constructed ModelSizeStats has CategorizationStatus.OK
341+
ModelSizeStats modelSizeStats =
342+
new ModelSizeStats.Builder(JOB_ID).setCategorizationStatus(ModelSizeStats.CategorizationStatus.WARN).build();
343+
when(result.getModelSizeStats()).thenReturn(modelSizeStats);
344+
processorUnderTest.processResult(result);
345+
346+
verify(persister).bulkPersisterBuilder(eq(JOB_ID), any());
347+
verify(persister).persistModelSizeStats(any(ModelSizeStats.class), any());
348+
// We should have only fired one notification; only the change from ok to warn should have fired, not the subsequent warn
349+
verify(auditor).warning(JOB_ID, Messages.getMessage(Messages.JOB_AUDIT_CATEGORIZATION_STATUS_WARN, "warn", 0));
350+
}
351+
377352
public void testProcessResult_modelSnapshot() {
378353
AutodetectResult result = mock(AutodetectResult.class);
379354
ModelSnapshot modelSnapshot = new ModelSnapshot.Builder(JOB_ID)

0 commit comments

Comments
 (0)