Skip to content

Commit 295665b

Browse files
committed
[ML] Add audit warning for 1000 categories found early in job (#51146)
If 1000 different category definitions are created for a job in the first 100 buckets it processes then an audit warning will now be created. (This will cause a yellow warning triangle in the ML UI's jobs list.) Such a large number of categories suggests that the field that categorization is working on is not well suited to the ML categorization functionality.
1 parent da73c91 commit 295665b

File tree

3 files changed

+104
-19
lines changed

3 files changed

+104
-19
lines changed

x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/messages/Messages.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -135,6 +135,8 @@ public final class Messages {
135135
"Adjust the analysis_limits.model_memory_limit setting to ensure all data is analyzed";
136136
public static final String JOB_AUDIT_MEMORY_STATUS_HARD_LIMIT_PRE_7_2 = "Job memory status changed to hard_limit at {0}; adjust the " +
137137
"analysis_limits.model_memory_limit setting to ensure all data is analyzed";
138+
public static final String JOB_AUDIT_EXCESSIVE_EARLY_CATEGORIES = "{0} categories observed in the first [{1}] buckets." +
139+
" This suggests an inappropriate categorization_field_name has been chosen.";
138140

139141
public static final String JOB_CONFIG_CATEGORIZATION_FILTERS_CONTAINS_DUPLICATES = "categorization_filters contain duplicates";
140142
public static final String JOB_CONFIG_CATEGORIZATION_FILTERS_CONTAINS_EMPTY =

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/output/AutodetectResultProcessor.java

Lines changed: 28 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -74,6 +74,9 @@ public class AutodetectResultProcessor {
7474

7575
private static final Logger LOGGER = LogManager.getLogger(AutodetectResultProcessor.class);
7676

77+
static final long EARLY_BUCKET_THRESHOLD = 100;
78+
static final int EXCESSIVE_EARLY_CATEGORY_COUNT = 1000;
79+
7780
private final Client client;
7881
private final AnomalyDetectionAuditor auditor;
7982
private final String jobId;
@@ -87,7 +90,9 @@ public class AutodetectResultProcessor {
8790
private final FlushListener flushListener;
8891
private volatile boolean processKilled;
8992
private volatile boolean failed;
90-
private int bucketCount; // only used from the process() thread, so doesn't need to be volatile
93+
private long priorRunsBucketCount;
94+
private long currentRunBucketCount; // only used from the process() thread, so doesn't need to be volatile
95+
private boolean excessiveCategoryWarningIssued; // only used from the process() thread, so doesn't need to be volatile
9196
private final JobResultsPersister.Builder bulkResultsPersister;
9297
private boolean deleteInterimRequired;
9398

@@ -122,6 +127,7 @@ public AutodetectResultProcessor(Client client,
122127
this.bulkResultsPersister = persister.bulkPersisterBuilder(jobId, this::isAlive);
123128
this.timingStatsReporter = new TimingStatsReporter(timingStats, bulkResultsPersister);
124129
this.deleteInterimRequired = true;
130+
this.priorRunsBucketCount = timingStats.getBucketCount();
125131
}
126132

127133
public void process() {
@@ -140,7 +146,7 @@ public void process() {
140146
} catch (Exception e) {
141147
LOGGER.warn(new ParameterizedMessage("[{}] Error persisting autodetect results", jobId), e);
142148
}
143-
LOGGER.info("[{}] {} buckets parsed from autodetect output", jobId, bucketCount);
149+
LOGGER.info("[{}] {} buckets parsed from autodetect output", jobId, currentRunBucketCount);
144150

145151
} catch (Exception e) {
146152
failed = true;
@@ -166,15 +172,15 @@ public void process() {
166172
}
167173

168174
private void readResults() {
169-
bucketCount = 0;
175+
currentRunBucketCount = 0;
170176
try {
171177
Iterator<AutodetectResult> iterator = process.readAutodetectResults();
172178
while (iterator.hasNext()) {
173179
try {
174180
AutodetectResult result = iterator.next();
175181
processResult(result);
176182
if (result.getBucket() != null) {
177-
LOGGER.trace("[{}] Bucket number {} parsed from output", jobId, bucketCount);
183+
LOGGER.trace("[{}] Bucket number {} parsed from output", jobId, currentRunBucketCount);
178184
}
179185
} catch (Exception e) {
180186
if (isAlive() == false) {
@@ -212,7 +218,7 @@ void processResult(AutodetectResult result) {
212218
// results are also interim
213219
timingStatsReporter.reportBucket(bucket);
214220
bulkResultsPersister.persistBucket(bucket).executeRequest();
215-
++bucketCount;
221+
++currentRunBucketCount;
216222
}
217223
List<AnomalyRecord> records = result.getRecords();
218224
if (records != null && !records.isEmpty()) {
@@ -224,7 +230,7 @@ void processResult(AutodetectResult result) {
224230
}
225231
CategoryDefinition categoryDefinition = result.getCategoryDefinition();
226232
if (categoryDefinition != null) {
227-
persister.persistCategoryDefinition(categoryDefinition, this::isAlive);
233+
processCategoryDefinition(categoryDefinition);
228234
}
229235
ModelPlot modelPlot = result.getModelPlot();
230236
if (modelPlot != null) {
@@ -308,6 +314,22 @@ void processResult(AutodetectResult result) {
308314
}
309315
}
310316

317+
private void processCategoryDefinition(CategoryDefinition categoryDefinition) {
318+
persister.persistCategoryDefinition(categoryDefinition, this::isAlive);
319+
if (categoryDefinition.getCategoryId() == EXCESSIVE_EARLY_CATEGORY_COUNT &&
320+
priorRunsBucketCount + currentRunBucketCount < EARLY_BUCKET_THRESHOLD &&
321+
excessiveCategoryWarningIssued == false) {
322+
auditor.warning(jobId, Messages.getMessage(Messages.JOB_AUDIT_EXCESSIVE_EARLY_CATEGORIES, EXCESSIVE_EARLY_CATEGORY_COUNT,
323+
// Add 1 because category definitions are written before buckets
324+
1L + priorRunsBucketCount + currentRunBucketCount));
325+
// This flag won't be retained if the job is closed and reopened, or if the job migrates to another node.
326+
// This means it's possible the audit message is generated multiple times. However, that's not a
327+
// disaster, and is also very unlikely in the the (best practice) cases where initial lookback covers
328+
// more than 100 buckets.
329+
excessiveCategoryWarningIssued = true;
330+
}
331+
}
332+
311333
private void processModelSizeStats(ModelSizeStats modelSizeStats) {
312334
LOGGER.trace("[{}] Parsed ModelSizeStats: {} / {} / {} / {} / {} / {}",
313335
jobId, modelSizeStats.getModelBytes(), modelSizeStats.getTotalByFieldCount(),

x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/output/AutodetectResultProcessorTests.java

Lines changed: 74 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -133,7 +133,7 @@ public void testProcess() throws TimeoutException {
133133
verify(persister).commitStateWrites(JOB_ID);
134134
}
135135

136-
public void testProcessResult_bucket() throws Exception {
136+
public void testProcessResult_bucket() {
137137
when(bulkBuilder.persistTimingStats(any(TimingStats.class))).thenReturn(bulkBuilder);
138138
when(bulkBuilder.persistBucket(any(Bucket.class))).thenReturn(bulkBuilder);
139139
AutodetectResult result = mock(AutodetectResult.class);
@@ -150,7 +150,7 @@ public void testProcessResult_bucket() throws Exception {
150150
verify(persister, never()).deleteInterimResults(JOB_ID);
151151
}
152152

153-
public void testProcessResult_bucket_deleteInterimRequired() throws Exception {
153+
public void testProcessResult_bucket_deleteInterimRequired() {
154154
when(bulkBuilder.persistTimingStats(any(TimingStats.class))).thenReturn(bulkBuilder);
155155
when(bulkBuilder.persistBucket(any(Bucket.class))).thenReturn(bulkBuilder);
156156
AutodetectResult result = mock(AutodetectResult.class);
@@ -167,7 +167,7 @@ public void testProcessResult_bucket_deleteInterimRequired() throws Exception {
167167
verify(persister).deleteInterimResults(JOB_ID);
168168
}
169169

170-
public void testProcessResult_records() throws Exception {
170+
public void testProcessResult_records() {
171171
AutodetectResult result = mock(AutodetectResult.class);
172172
List<AnomalyRecord> records =
173173
Arrays.asList(
@@ -183,7 +183,7 @@ public void testProcessResult_records() throws Exception {
183183
verify(persister).bulkPersisterBuilder(eq(JOB_ID), any());
184184
}
185185

186-
public void testProcessResult_influencers() throws Exception {
186+
public void testProcessResult_influencers() {
187187
AutodetectResult result = mock(AutodetectResult.class);
188188
List<Influencer> influencers =
189189
Arrays.asList(
@@ -199,9 +199,10 @@ public void testProcessResult_influencers() throws Exception {
199199
verify(persister).bulkPersisterBuilder(eq(JOB_ID), any());
200200
}
201201

202-
public void testProcessResult_categoryDefinition() throws Exception {
202+
public void testProcessResult_categoryDefinition() {
203203
AutodetectResult result = mock(AutodetectResult.class);
204204
CategoryDefinition categoryDefinition = mock(CategoryDefinition.class);
205+
when(categoryDefinition.getCategoryId()).thenReturn(1L);
205206
when(result.getCategoryDefinition()).thenReturn(categoryDefinition);
206207

207208
processorUnderTest.setDeleteInterimRequired(false);
@@ -212,7 +213,66 @@ public void testProcessResult_categoryDefinition() throws Exception {
212213
verify(persister).bulkPersisterBuilder(eq(JOB_ID), any());
213214
}
214215

215-
public void testProcessResult_flushAcknowledgement() throws Exception {
216+
public void testProcessResult_excessiveCategoryDefinitionCountEarly() {
217+
int iterations = 3;
218+
int categoryCount = AutodetectResultProcessor.EXCESSIVE_EARLY_CATEGORY_COUNT * 2;
219+
220+
processorUnderTest.setDeleteInterimRequired(false);
221+
222+
AutodetectResult result = mock(AutodetectResult.class);
223+
for (int iteration = 1; iteration <= iterations; ++iteration) {
224+
for (int categoryId = 1; categoryId <= categoryCount; ++categoryId) {
225+
CategoryDefinition categoryDefinition = new CategoryDefinition(JOB_ID);
226+
categoryDefinition.setCategoryId(categoryId);
227+
when(result.getCategoryDefinition()).thenReturn(categoryDefinition);
228+
229+
processorUnderTest.processResult(result);
230+
}
231+
}
232+
233+
verify(bulkBuilder, never()).executeRequest();
234+
verify(persister, times(iterations * categoryCount)).persistCategoryDefinition(any(CategoryDefinition.class), any());
235+
verify(persister).bulkPersisterBuilder(eq(JOB_ID), any());
236+
verify(auditor).warning(eq(JOB_ID), eq(Messages.getMessage(Messages.JOB_AUDIT_EXCESSIVE_EARLY_CATEGORIES,
237+
AutodetectResultProcessor.EXCESSIVE_EARLY_CATEGORY_COUNT, 1)));
238+
}
239+
240+
public void testProcessResult_highCategoryDefinitionCountLateOn() {
241+
int iterations = 3;
242+
int categoryCount = AutodetectResultProcessor.EXCESSIVE_EARLY_CATEGORY_COUNT * 2;
243+
244+
processorUnderTest.setDeleteInterimRequired(false);
245+
246+
when(bulkBuilder.persistTimingStats(any(TimingStats.class))).thenReturn(bulkBuilder);
247+
when(bulkBuilder.persistBucket(any(Bucket.class))).thenReturn(bulkBuilder);
248+
249+
AutodetectResult bucketResult = mock(AutodetectResult.class);
250+
final int numPriorBuckets = (int) AutodetectResultProcessor.EARLY_BUCKET_THRESHOLD + 1;
251+
for (int i = 0; i < numPriorBuckets; ++i) {
252+
Bucket bucket = new Bucket(JOB_ID, new Date(i * 1000 + 1000000), BUCKET_SPAN_MS);
253+
when(bucketResult.getBucket()).thenReturn(bucket);
254+
processorUnderTest.processResult(bucketResult);
255+
}
256+
257+
AutodetectResult categoryResult = mock(AutodetectResult.class);
258+
for (int iteration = 1; iteration <= iterations; ++iteration) {
259+
for (int categoryId = 1; categoryId <= categoryCount; ++categoryId) {
260+
CategoryDefinition categoryDefinition = new CategoryDefinition(JOB_ID);
261+
categoryDefinition.setCategoryId(categoryId);
262+
when(categoryResult.getCategoryDefinition()).thenReturn(categoryDefinition);
263+
processorUnderTest.processResult(categoryResult);
264+
}
265+
}
266+
267+
verify(bulkBuilder).persistTimingStats(any(TimingStats.class));
268+
verify(bulkBuilder, times(numPriorBuckets)).persistBucket(any(Bucket.class));
269+
verify(bulkBuilder, times(numPriorBuckets)).executeRequest();
270+
verify(persister, times(iterations * categoryCount)).persistCategoryDefinition(any(CategoryDefinition.class), any());
271+
verify(persister).bulkPersisterBuilder(eq(JOB_ID), any());
272+
verify(auditor, never()).warning(eq(JOB_ID), anyString());
273+
}
274+
275+
public void testProcessResult_flushAcknowledgement() {
216276
AutodetectResult result = mock(AutodetectResult.class);
217277
FlushAcknowledgement flushAcknowledgement = mock(FlushAcknowledgement.class);
218278
when(flushAcknowledgement.getId()).thenReturn(JOB_ID);
@@ -228,12 +288,13 @@ public void testProcessResult_flushAcknowledgement() throws Exception {
228288
verify(bulkBuilder).executeRequest();
229289
}
230290

231-
public void testProcessResult_flushAcknowledgementMustBeProcessedLast() throws Exception {
291+
public void testProcessResult_flushAcknowledgementMustBeProcessedLast() {
232292
AutodetectResult result = mock(AutodetectResult.class);
233293
FlushAcknowledgement flushAcknowledgement = mock(FlushAcknowledgement.class);
234294
when(flushAcknowledgement.getId()).thenReturn(JOB_ID);
235295
when(result.getFlushAcknowledgement()).thenReturn(flushAcknowledgement);
236296
CategoryDefinition categoryDefinition = mock(CategoryDefinition.class);
297+
when(categoryDefinition.getCategoryId()).thenReturn(1L);
237298
when(result.getCategoryDefinition()).thenReturn(categoryDefinition);
238299

239300
processorUnderTest.setDeleteInterimRequired(false);
@@ -248,7 +309,7 @@ public void testProcessResult_flushAcknowledgementMustBeProcessedLast() throws E
248309
inOrder.verify(flushListener).acknowledgeFlush(flushAcknowledgement, null);
249310
}
250311

251-
public void testProcessResult_modelPlot() throws Exception {
312+
public void testProcessResult_modelPlot() {
252313
AutodetectResult result = mock(AutodetectResult.class);
253314
ModelPlot modelPlot = mock(ModelPlot.class);
254315
when(result.getModelPlot()).thenReturn(modelPlot);
@@ -260,7 +321,7 @@ public void testProcessResult_modelPlot() throws Exception {
260321
verify(bulkBuilder).persistModelPlot(modelPlot);
261322
}
262323

263-
public void testProcessResult_modelSizeStats() throws Exception {
324+
public void testProcessResult_modelSizeStats() {
264325
AutodetectResult result = mock(AutodetectResult.class);
265326
ModelSizeStats modelSizeStats = mock(ModelSizeStats.class);
266327
when(result.getModelSizeStats()).thenReturn(modelSizeStats);
@@ -273,7 +334,7 @@ public void testProcessResult_modelSizeStats() throws Exception {
273334
verify(persister).persistModelSizeStats(eq(modelSizeStats), any());
274335
}
275336

276-
public void testProcessResult_modelSizeStatsWithMemoryStatusChanges() throws Exception {
337+
public void testProcessResult_modelSizeStatsWithMemoryStatusChanges() {
277338
TimeValue delay = TimeValue.timeValueSeconds(5);
278339
// Set up schedule delay time
279340
when(threadPool.schedule(any(Runnable.class), any(TimeValue.class), anyString()))
@@ -313,7 +374,7 @@ public void testProcessResult_modelSizeStatsWithMemoryStatusChanges() throws Exc
313374
verify(auditor).error(JOB_ID, Messages.getMessage(Messages.JOB_AUDIT_MEMORY_STATUS_HARD_LIMIT, "512mb", "1kb"));
314375
}
315376

316-
public void testProcessResult_modelSnapshot() throws Exception {
377+
public void testProcessResult_modelSnapshot() {
317378
AutodetectResult result = mock(AutodetectResult.class);
318379
ModelSnapshot modelSnapshot = new ModelSnapshot.Builder(JOB_ID)
319380
.setSnapshotId("a_snapshot_id")
@@ -337,7 +398,7 @@ public void testProcessResult_modelSnapshot() throws Exception {
337398
verify(client).execute(same(UpdateJobAction.INSTANCE), eq(expectedJobUpdateRequest), any());
338399
}
339400

340-
public void testProcessResult_quantiles_givenRenormalizationIsEnabled() throws Exception {
401+
public void testProcessResult_quantiles_givenRenormalizationIsEnabled() {
341402
AutodetectResult result = mock(AutodetectResult.class);
342403
Quantiles quantiles = mock(Quantiles.class);
343404
when(result.getQuantiles()).thenReturn(quantiles);
@@ -354,7 +415,7 @@ public void testProcessResult_quantiles_givenRenormalizationIsEnabled() throws E
354415
verify(renormalizer).renormalize(quantiles);
355416
}
356417

357-
public void testProcessResult_quantiles_givenRenormalizationIsDisabled() throws Exception {
418+
public void testProcessResult_quantiles_givenRenormalizationIsDisabled() {
358419
AutodetectResult result = mock(AutodetectResult.class);
359420
Quantiles quantiles = mock(Quantiles.class);
360421
when(result.getQuantiles()).thenReturn(quantiles);

0 commit comments

Comments
 (0)