Skip to content

Commit 2cfe703

Browse files
[ML] Ensure immutability of MlMetadata (#31957)
The test failure in #31916 revealed that updating rules on a job was modifying the detectors list in-place. That meant the old cluster state and the updated cluster state had no difference and thus the change was not propagated to non-master nodes. This commit fixes that and also reviews all of ML metadata in order to ensure immutability. Closes #31916
1 parent e3707ef commit 2cfe703

File tree

10 files changed

+238
-69
lines changed

10 files changed

+238
-69
lines changed

x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/datafeed/DatafeedConfig.java

+8-8
Original file line numberDiff line numberDiff line change
@@ -156,14 +156,14 @@ private DatafeedConfig(String id, String jobId, TimeValue queryDelay, TimeValue
156156
this.jobId = jobId;
157157
this.queryDelay = queryDelay;
158158
this.frequency = frequency;
159-
this.indices = indices;
160-
this.types = types;
159+
this.indices = indices == null ? null : Collections.unmodifiableList(indices);
160+
this.types = types == null ? null : Collections.unmodifiableList(types);
161161
this.query = query;
162162
this.aggregations = aggregations;
163-
this.scriptFields = scriptFields;
163+
this.scriptFields = scriptFields == null ? null : Collections.unmodifiableList(scriptFields);
164164
this.scrollSize = scrollSize;
165165
this.chunkingConfig = chunkingConfig;
166-
this.headers = Objects.requireNonNull(headers);
166+
this.headers = Collections.unmodifiableMap(headers);
167167
}
168168

169169
public DatafeedConfig(StreamInput in) throws IOException {
@@ -172,19 +172,19 @@ public DatafeedConfig(StreamInput in) throws IOException {
172172
this.queryDelay = in.readOptionalTimeValue();
173173
this.frequency = in.readOptionalTimeValue();
174174
if (in.readBoolean()) {
175-
this.indices = in.readList(StreamInput::readString);
175+
this.indices = Collections.unmodifiableList(in.readList(StreamInput::readString));
176176
} else {
177177
this.indices = null;
178178
}
179179
if (in.readBoolean()) {
180-
this.types = in.readList(StreamInput::readString);
180+
this.types = Collections.unmodifiableList(in.readList(StreamInput::readString));
181181
} else {
182182
this.types = null;
183183
}
184184
this.query = in.readNamedWriteable(QueryBuilder.class);
185185
this.aggregations = in.readOptionalWriteable(AggregatorFactories.Builder::new);
186186
if (in.readBoolean()) {
187-
this.scriptFields = in.readList(SearchSourceBuilder.ScriptField::new);
187+
this.scriptFields = Collections.unmodifiableList(in.readList(SearchSourceBuilder.ScriptField::new));
188188
} else {
189189
this.scriptFields = null;
190190
}
@@ -195,7 +195,7 @@ public DatafeedConfig(StreamInput in) throws IOException {
195195
}
196196
this.chunkingConfig = in.readOptionalWriteable(ChunkingConfig::new);
197197
if (in.getVersion().onOrAfter(Version.V_6_2_0)) {
198-
this.headers = in.readMap(StreamInput::readString, StreamInput::readString);
198+
this.headers = Collections.unmodifiableMap(in.readMap(StreamInput::readString, StreamInput::readString));
199199
} else {
200200
this.headers = Collections.emptyMap();
201201
}

x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/datafeed/DatafeedUpdate.java

+12
Original file line numberDiff line numberDiff line change
@@ -352,6 +352,18 @@ public String toString() {
352352
return Strings.toString(this);
353353
}
354354

355+
boolean isNoop(DatafeedConfig datafeed) {
356+
return (frequency == null || Objects.equals(frequency, datafeed.getFrequency()))
357+
&& (queryDelay == null || Objects.equals(queryDelay, datafeed.getQueryDelay()))
358+
&& (indices == null || Objects.equals(indices, datafeed.getIndices()))
359+
&& (types == null || Objects.equals(types, datafeed.getTypes()))
360+
&& (query == null || Objects.equals(query, datafeed.getQuery()))
361+
&& (scrollSize == null || Objects.equals(scrollSize, datafeed.getQueryDelay()))
362+
&& (aggregations == null || Objects.equals(aggregations, datafeed.getAggregations()))
363+
&& (scriptFields == null || Objects.equals(scriptFields, datafeed.getScriptFields()))
364+
&& (chunkingConfig == null || Objects.equals(chunkingConfig, datafeed.getChunkingConfig()));
365+
}
366+
355367
public static class Builder {
356368

357369
private String id;

x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/config/AnalysisConfig.java

+18-12
Original file line numberDiff line numberDiff line change
@@ -144,29 +144,29 @@ private AnalysisConfig(TimeValue bucketSpan, String categorizationFieldName, Lis
144144
this.latency = latency;
145145
this.categorizationFieldName = categorizationFieldName;
146146
this.categorizationAnalyzerConfig = categorizationAnalyzerConfig;
147-
this.categorizationFilters = categorizationFilters;
147+
this.categorizationFilters = categorizationFilters == null ? null : Collections.unmodifiableList(categorizationFilters);
148148
this.summaryCountFieldName = summaryCountFieldName;
149-
this.influencers = influencers;
149+
this.influencers = Collections.unmodifiableList(influencers);
150150
this.overlappingBuckets = overlappingBuckets;
151151
this.resultFinalizationWindow = resultFinalizationWindow;
152152
this.multivariateByFields = multivariateByFields;
153-
this.multipleBucketSpans = multipleBucketSpans;
153+
this.multipleBucketSpans = multipleBucketSpans == null ? null : Collections.unmodifiableList(multipleBucketSpans);
154154
this.usePerPartitionNormalization = usePerPartitionNormalization;
155155
}
156156

157157
public AnalysisConfig(StreamInput in) throws IOException {
158158
bucketSpan = in.readTimeValue();
159159
categorizationFieldName = in.readOptionalString();
160-
categorizationFilters = in.readBoolean() ? in.readList(StreamInput::readString) : null;
160+
categorizationFilters = in.readBoolean() ? Collections.unmodifiableList(in.readList(StreamInput::readString)) : null;
161161
if (in.getVersion().onOrAfter(Version.V_6_2_0)) {
162162
categorizationAnalyzerConfig = in.readOptionalWriteable(CategorizationAnalyzerConfig::new);
163163
} else {
164164
categorizationAnalyzerConfig = null;
165165
}
166166
latency = in.readOptionalTimeValue();
167167
summaryCountFieldName = in.readOptionalString();
168-
detectors = in.readList(Detector::new);
169-
influencers = in.readList(StreamInput::readString);
168+
detectors = Collections.unmodifiableList(in.readList(Detector::new));
169+
influencers = Collections.unmodifiableList(in.readList(StreamInput::readString));
170170
overlappingBuckets = in.readOptionalBoolean();
171171
resultFinalizationWindow = in.readOptionalLong();
172172
multivariateByFields = in.readOptionalBoolean();
@@ -176,7 +176,7 @@ public AnalysisConfig(StreamInput in) throws IOException {
176176
for (int i = 0; i < arraySize; i++) {
177177
spans.add(in.readTimeValue());
178178
}
179-
multipleBucketSpans = spans;
179+
multipleBucketSpans = Collections.unmodifiableList(spans);
180180
} else {
181181
multipleBucketSpans = null;
182182
}
@@ -487,18 +487,20 @@ public Builder(List<Detector> detectors) {
487487
}
488488

489489
public Builder(AnalysisConfig analysisConfig) {
490-
this.detectors = analysisConfig.detectors;
490+
this.detectors = new ArrayList<>(analysisConfig.detectors);
491491
this.bucketSpan = analysisConfig.bucketSpan;
492492
this.latency = analysisConfig.latency;
493493
this.categorizationFieldName = analysisConfig.categorizationFieldName;
494-
this.categorizationFilters = analysisConfig.categorizationFilters;
494+
this.categorizationFilters = analysisConfig.categorizationFilters == null ? null
495+
: new ArrayList<>(analysisConfig.categorizationFilters);
495496
this.categorizationAnalyzerConfig = analysisConfig.categorizationAnalyzerConfig;
496497
this.summaryCountFieldName = analysisConfig.summaryCountFieldName;
497-
this.influencers = analysisConfig.influencers;
498+
this.influencers = new ArrayList<>(analysisConfig.influencers);
498499
this.overlappingBuckets = analysisConfig.overlappingBuckets;
499500
this.resultFinalizationWindow = analysisConfig.resultFinalizationWindow;
500501
this.multivariateByFields = analysisConfig.multivariateByFields;
501-
this.multipleBucketSpans = analysisConfig.multipleBucketSpans;
502+
this.multipleBucketSpans = analysisConfig.multipleBucketSpans == null ? null
503+
: new ArrayList<>(analysisConfig.multipleBucketSpans);
502504
this.usePerPartitionNormalization = analysisConfig.usePerPartitionNormalization;
503505
}
504506

@@ -518,6 +520,10 @@ public void setDetectors(List<Detector> detectors) {
518520
this.detectors = sequentialIndexDetectors;
519521
}
520522

523+
public void setDetector(int detectorIndex, Detector detector) {
524+
detectors.set(detectorIndex, detector);
525+
}
526+
521527
public void setBucketSpan(TimeValue bucketSpan) {
522528
this.bucketSpan = bucketSpan;
523529
}
@@ -543,7 +549,7 @@ public void setSummaryCountFieldName(String summaryCountFieldName) {
543549
}
544550

545551
public void setInfluencers(List<String> influencers) {
546-
this.influencers = influencers;
552+
this.influencers = ExceptionsHelper.requireNonNull(influencers, INFLUENCERS.getPreferredName());
547553
}
548554

549555
public void setOverlappingBuckets(Boolean overlappingBuckets) {

x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/config/Detector.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -252,7 +252,7 @@ public Detector(StreamInput in) throws IOException {
252252
partitionFieldName = in.readOptionalString();
253253
useNull = in.readBoolean();
254254
excludeFrequent = in.readBoolean() ? ExcludeFrequent.readFromStream(in) : null;
255-
rules = in.readList(DetectionRule::new);
255+
rules = Collections.unmodifiableList(in.readList(DetectionRule::new));
256256
if (in.getVersion().onOrAfter(Version.V_5_5_0)) {
257257
detectorIndex = in.readInt();
258258
} else {
@@ -508,7 +508,7 @@ public Builder(Detector detector) {
508508
partitionFieldName = detector.partitionFieldName;
509509
useNull = detector.useNull;
510510
excludeFrequent = detector.excludeFrequent;
511-
rules = new ArrayList<>(detector.getRules());
511+
rules = new ArrayList<>(detector.rules);
512512
detectorIndex = detector.detectorIndex;
513513
}
514514

x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/config/Job.java

+9-6
Original file line numberDiff line numberDiff line change
@@ -193,7 +193,7 @@ private Job(String jobId, String jobType, Version jobVersion, List<String> group
193193
this.jobId = jobId;
194194
this.jobType = jobType;
195195
this.jobVersion = jobVersion;
196-
this.groups = groups;
196+
this.groups = Collections.unmodifiableList(groups);
197197
this.description = description;
198198
this.createTime = createTime;
199199
this.finishedTime = finishedTime;
@@ -207,7 +207,7 @@ private Job(String jobId, String jobType, Version jobVersion, List<String> group
207207
this.backgroundPersistInterval = backgroundPersistInterval;
208208
this.modelSnapshotRetentionDays = modelSnapshotRetentionDays;
209209
this.resultsRetentionDays = resultsRetentionDays;
210-
this.customSettings = customSettings;
210+
this.customSettings = customSettings == null ? null : Collections.unmodifiableMap(customSettings);
211211
this.modelSnapshotId = modelSnapshotId;
212212
this.modelSnapshotMinVersion = modelSnapshotMinVersion;
213213
this.resultsIndexName = resultsIndexName;
@@ -223,7 +223,7 @@ public Job(StreamInput in) throws IOException {
223223
jobVersion = null;
224224
}
225225
if (in.getVersion().onOrAfter(Version.V_6_1_0)) {
226-
groups = in.readList(StreamInput::readString);
226+
groups = Collections.unmodifiableList(in.readList(StreamInput::readString));
227227
} else {
228228
groups = Collections.emptyList();
229229
}
@@ -244,7 +244,8 @@ public Job(StreamInput in) throws IOException {
244244
backgroundPersistInterval = in.readOptionalTimeValue();
245245
modelSnapshotRetentionDays = in.readOptionalLong();
246246
resultsRetentionDays = in.readOptionalLong();
247-
customSettings = in.readMap();
247+
Map<String, Object> readCustomSettings = in.readMap();
248+
customSettings = readCustomSettings == null ? null : Collections.unmodifiableMap(readCustomSettings);
248249
modelSnapshotId = in.readOptionalString();
249250
if (in.getVersion().onOrAfter(Version.V_7_0_0_alpha1) && in.readBoolean()) {
250251
modelSnapshotMinVersion = Version.readVersion(in);
@@ -627,7 +628,8 @@ public boolean equals(Object other) {
627628
&& Objects.equals(this.lastDataTime, that.lastDataTime)
628629
&& Objects.equals(this.establishedModelMemory, that.establishedModelMemory)
629630
&& Objects.equals(this.analysisConfig, that.analysisConfig)
630-
&& Objects.equals(this.analysisLimits, that.analysisLimits) && Objects.equals(this.dataDescription, that.dataDescription)
631+
&& Objects.equals(this.analysisLimits, that.analysisLimits)
632+
&& Objects.equals(this.dataDescription, that.dataDescription)
631633
&& Objects.equals(this.modelPlotConfig, that.modelPlotConfig)
632634
&& Objects.equals(this.renormalizationWindowDays, that.renormalizationWindowDays)
633635
&& Objects.equals(this.backgroundPersistInterval, that.backgroundPersistInterval)
@@ -1055,6 +1057,7 @@ public boolean equals(Object o) {
10551057
return Objects.equals(this.id, that.id)
10561058
&& Objects.equals(this.jobType, that.jobType)
10571059
&& Objects.equals(this.jobVersion, that.jobVersion)
1060+
&& Objects.equals(this.groups, that.groups)
10581061
&& Objects.equals(this.description, that.description)
10591062
&& Objects.equals(this.analysisConfig, that.analysisConfig)
10601063
&& Objects.equals(this.analysisLimits, that.analysisLimits)
@@ -1077,7 +1080,7 @@ public boolean equals(Object o) {
10771080

10781081
@Override
10791082
public int hashCode() {
1080-
return Objects.hash(id, jobType, jobVersion, description, analysisConfig, analysisLimits, dataDescription, createTime,
1083+
return Objects.hash(id, jobType, jobVersion, groups, description, analysisConfig, analysisLimits, dataDescription, createTime,
10811084
finishedTime, lastDataTime, establishedModelMemory, modelPlotConfig, renormalizationWindowDays,
10821085
backgroundPersistInterval, modelSnapshotRetentionDays, resultsRetentionDays, customSettings, modelSnapshotId,
10831086
modelSnapshotMinVersion, resultsIndexName, deleted);

x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/config/JobUpdate.java

+49-12
Original file line numberDiff line numberDiff line change
@@ -373,33 +373,33 @@ public Set<String> getUpdateFields() {
373373
*/
374374
public Job mergeWithJob(Job source, ByteSizeValue maxModelMemoryLimit) {
375375
Job.Builder builder = new Job.Builder(source);
376+
AnalysisConfig currentAnalysisConfig = source.getAnalysisConfig();
377+
AnalysisConfig.Builder newAnalysisConfig = new AnalysisConfig.Builder(currentAnalysisConfig);
378+
376379
if (groups != null) {
377380
builder.setGroups(groups);
378381
}
379382
if (description != null) {
380383
builder.setDescription(description);
381384
}
382385
if (detectorUpdates != null && detectorUpdates.isEmpty() == false) {
383-
AnalysisConfig ac = source.getAnalysisConfig();
384-
int numDetectors = ac.getDetectors().size();
386+
int numDetectors = currentAnalysisConfig.getDetectors().size();
385387
for (DetectorUpdate dd : detectorUpdates) {
386388
if (dd.getDetectorIndex() >= numDetectors) {
387389
throw ExceptionsHelper.badRequestException("Supplied detector_index [{}] is >= the number of detectors [{}]",
388390
dd.getDetectorIndex(), numDetectors);
389391
}
390392

391-
Detector.Builder detectorbuilder = new Detector.Builder(ac.getDetectors().get(dd.getDetectorIndex()));
393+
Detector.Builder detectorBuilder = new Detector.Builder(currentAnalysisConfig.getDetectors().get(dd.getDetectorIndex()));
392394
if (dd.getDescription() != null) {
393-
detectorbuilder.setDetectorDescription(dd.getDescription());
395+
detectorBuilder.setDetectorDescription(dd.getDescription());
394396
}
395397
if (dd.getRules() != null) {
396-
detectorbuilder.setRules(dd.getRules());
398+
detectorBuilder.setRules(dd.getRules());
397399
}
398-
ac.getDetectors().set(dd.getDetectorIndex(), detectorbuilder.build());
399-
}
400400

401-
AnalysisConfig.Builder acBuilder = new AnalysisConfig.Builder(ac);
402-
builder.setAnalysisConfig(acBuilder);
401+
newAnalysisConfig.setDetector(dd.getDetectorIndex(), detectorBuilder.build());
402+
}
403403
}
404404
if (modelPlotConfig != null) {
405405
builder.setModelPlotConfig(modelPlotConfig);
@@ -422,9 +422,7 @@ public Job mergeWithJob(Job source, ByteSizeValue maxModelMemoryLimit) {
422422
builder.setResultsRetentionDays(resultsRetentionDays);
423423
}
424424
if (categorizationFilters != null) {
425-
AnalysisConfig.Builder analysisConfigBuilder = new AnalysisConfig.Builder(source.getAnalysisConfig());
426-
analysisConfigBuilder.setCategorizationFilters(categorizationFilters);
427-
builder.setAnalysisConfig(analysisConfigBuilder);
425+
newAnalysisConfig.setCategorizationFilters(categorizationFilters);
428426
}
429427
if (customSettings != null) {
430428
builder.setCustomSettings(customSettings);
@@ -446,9 +444,48 @@ public Job mergeWithJob(Job source, ByteSizeValue maxModelMemoryLimit) {
446444
if (jobVersion != null) {
447445
builder.setJobVersion(jobVersion);
448446
}
447+
448+
builder.setAnalysisConfig(newAnalysisConfig);
449449
return builder.build();
450450
}
451451

452+
boolean isNoop(Job job) {
453+
return (groups == null || Objects.equals(groups, job.getGroups()))
454+
&& (description == null || Objects.equals(description, job.getDescription()))
455+
&& (modelPlotConfig == null || Objects.equals(modelPlotConfig, job.getModelPlotConfig()))
456+
&& (analysisLimits == null || Objects.equals(analysisLimits, job.getAnalysisLimits()))
457+
&& updatesDetectors(job) == false
458+
&& (renormalizationWindowDays == null || Objects.equals(renormalizationWindowDays, job.getRenormalizationWindowDays()))
459+
&& (backgroundPersistInterval == null || Objects.equals(backgroundPersistInterval, job.getBackgroundPersistInterval()))
460+
&& (modelSnapshotRetentionDays == null || Objects.equals(modelSnapshotRetentionDays, job.getModelSnapshotRetentionDays()))
461+
&& (resultsRetentionDays == null || Objects.equals(resultsRetentionDays, job.getResultsRetentionDays()))
462+
&& (categorizationFilters == null
463+
|| Objects.equals(categorizationFilters, job.getAnalysisConfig().getCategorizationFilters()))
464+
&& (customSettings == null || Objects.equals(customSettings, job.getCustomSettings()))
465+
&& (modelSnapshotId == null || Objects.equals(modelSnapshotId, job.getModelSnapshotId()))
466+
&& (modelSnapshotMinVersion == null || Objects.equals(modelSnapshotMinVersion, job.getModelSnapshotMinVersion()))
467+
&& (establishedModelMemory == null || Objects.equals(establishedModelMemory, job.getEstablishedModelMemory()))
468+
&& (jobVersion == null || Objects.equals(jobVersion, job.getJobVersion()));
469+
}
470+
471+
boolean updatesDetectors(Job job) {
472+
AnalysisConfig analysisConfig = job.getAnalysisConfig();
473+
if (detectorUpdates == null) {
474+
return false;
475+
}
476+
for (DetectorUpdate detectorUpdate : detectorUpdates) {
477+
if (detectorUpdate.description == null && detectorUpdate.rules == null) {
478+
continue;
479+
}
480+
Detector detector = analysisConfig.getDetectors().get(detectorUpdate.detectorIndex);
481+
if (Objects.equals(detectorUpdate.description, detector.getDetectorDescription()) == false
482+
|| Objects.equals(detectorUpdate.rules, detector.getRules()) == false) {
483+
return true;
484+
}
485+
}
486+
return false;
487+
}
488+
452489
@Override
453490
public boolean equals(Object other) {
454491
if (this == other) {

x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/config/RuleScope.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,7 @@ public RuleScope() {
6060
}
6161

6262
public RuleScope(Map<String, FilterRef> scope) {
63-
this.scope = Objects.requireNonNull(scope);
63+
this.scope = Collections.unmodifiableMap(scope);
6464
}
6565

6666
public RuleScope(StreamInput in) throws IOException {

0 commit comments

Comments
 (0)