Skip to content

Commit e294056

Browse files
authored
[ML] Merge the Jindex master feature branch (#36702)
* [ML] Job and datafeed mappings with index template (#32719) Index mappings for the configuration documents * [ML] Job config document CRUD operations (#32738) * [ML] Datafeed config CRUD operations (#32854) * [ML] Change JobManager to work with Job config in index (#33064) * [ML] Change Datafeed actions to read config from the config index (#33273) * [ML] Allocate jobs based on JobParams rather than cluster state config (#33994) * [ML] Return missing job error when .ml-config is does not exist (#34177) * [ML] Close job in index (#34217) * [ML] Adjust finalize job action to work with documents (#34226) * [ML] Job in index: Datafeed node selector (#34218) * [ML] Job in Index: Stop and preview datafeed (#34605) * [ML] Delete job document (#34595) * [ML] Convert job data remover to work with index configs (#34532) * [ML] Job in index: Get datafeed and job stats from index (#34645) * [ML] Job in Index: Convert get calendar events to index docs (#34710) * [ML] Job in index: delete filter action (#34642) This changes the delete filter action to search for jobs using the filter to be deleted in the index rather than the cluster state. * [ML] Job in Index: Enable integ tests (#34851) Enables the ml integration tests excluding the rolling upgrade tests and a lot of fixes to make the tests pass again. * [ML] Reimplement established model memory (#35500) This is the 7.0 implementation of a master node service to keep track of the native process memory requirement of each ML job with an associated native process. The new ML memory tracker service works when the whole cluster is upgraded to at least version 6.6. For mixed version clusters the old mechanism of established model memory stored on the job in cluster state was used. This means that the old (and complex) code to keep established model memory up to date on the job object has been removed in 7.0. Forward port of #35263 * [ML] Need to wait for shards to replicate in distributed test (#35541) Because the cluster was expanded from 1 node to 3 indices would initially start off with 0 replicas. If the original node was killed before auto-expansion to 1 replica was complete then the test would fail because the indices would be unavailable. * [ML] DelayedDataCheckConfig index mappings (#35646) * [ML] JIndex: Restore finalize job action (#35939) * [ML] Replace Version.CURRENT in streaming functions (#36118) * [ML] Use 'anomaly-detector' in job config doc name (#36254) * [ML] Job In Index: Migrate config from the clusterstate (#35834) Migrate ML configuration from clusterstate to index for closed jobs only once all nodes are v6.6.0 or higher * [ML] Check groups against job Ids on update (#36317) * [ML] Adapt to periodic persistent task refresh (#36633) * [ML] Adapt to periodic persistent task refresh If https://github.com/elastic/elasticsearch/pull/36069/files is merged then the approach for reallocating ML persistent tasks after refreshing job memory requirements can be simplified. This change begins the simplification process. * Remove AwaitsFix and implement TODO * [ML] Default search size for configs * Fix TooManyJobsIT.testMultipleNodes Two problems: 1. Stack overflow during async iteration when lots of jobs on same machine 2. Not effectively setting search size in all cases * Use execute() instead of submit() in MlMemoryTracker We don't need a Future to wait for completion * [ML][TEST] Fix NPE in JobManagerTests * [ML] JIindex: Limit the size of bulk migrations (#36481) * [ML] Prevent updates and upgrade tests (#36649) * [FEATURE][ML] Add cluster setting that enables/disables config migration (#36700) This commit adds a cluster settings called `xpack.ml.enable_config_migration`. The setting is `true` by default. When set to `false`, no config migration will be attempted and non-migrated resources (e.g. jobs, datafeeds) will be able to be updated normally. Relates #32905 * [ML] Snapshot ml configs before migrating (#36645) * [FEATURE][ML] Split in batches and migrate all jobs and datafeeds (#36716) Relates #32905 * SQL: Fix translation of LIKE/RLIKE keywords (#36672) * SQL: Fix translation of LIKE/RLIKE keywords Refactor Like/RLike functions to simplify internals and improve query translation when chained or within a script context. Fix #36039 Fix #36584 * Fixing line length for EnvironmentTests and RecoveryTests (#36657) Relates #34884 * Add back one line removed by mistake regarding java version check and COMPAT jvm parameter existence * Do not resolve addresses in remote connection info (#36671) The remote connection info API leads to resolving addresses of seed nodes when invoked. This is problematic because if a hostname fails to resolve, we would not display any remote connection info. Yet, a hostname not resolving can happen across remote clusters, especially in the modern world of cloud services with dynamically chaning IPs. Instead, the remote connection info API should be providing the configured seed nodes. This commit changes the remote connection info to display the configured seed nodes, avoiding a hostname resolution. Note that care was taken to preserve backwards compatibility with previous versions that expect the remote connection info to serialize a transport address instead of a string representing the hostname. * [Painless] Add boxed type to boxed type casts for method/return (#36571) This adds implicit boxed type to boxed types casts for non-def types to create asymmetric casting relative to the def type when calling methods or returning values. This means that a user calling a method taking an Integer can call it with a Byte, Short, etc. legally which matches the way def works. This creates consistency in the casting model that did not previously exist. * SNAPSHOTS: Adjust BwC Versions in Restore Logic (#36718) * Re-enables bwc tests with adjusted version conditions now that #36397 enables concurrent snapshots in 6.6+ * ingest: fix on_failure with Drop processor (#36686) This commit allows a document to be dropped when a Drop processor is used in the on_failure fork of the processor chain. Fixes #36151 * Initialize startup `CcrRepositories` (#36730) Currently, the CcrRepositoryManger only listens for settings updates and installs new repositories. It does not install the repositories that are in the initial settings. This commit, modifies the manager to install the initial repositories. Additionally, it modifies the ccr integration test to configure the remote leader node at startup, instead of using a settings update. * [TEST] fix float comparison in RandomObjects#getExpectedParsedValue This commit fixes a test bug introduced with #36597. This caused some test failure as stored field values comparisons would not work when CBOR xcontent type was used. Closes #29080 * [Geo] Integrate Lucene's LatLonShape (BKD Backed GeoShapes) as default `geo_shape` indexing approach (#35320) This commit exposes lucene's LatLonShape field as the default type in GeoShapeFieldMapper. To use the new indexing approach, simply set "type" : "geo_shape" in the mappings without setting any of the strategy, precision, tree_levels, or distance_error_pct parameters. Note the following when using the new indexing approach: * geo_shape query does not support querying by MULTIPOINT. * LINESTRING and MULTILINESTRING queries do not yet support WITHIN relation. * CONTAINS relation is not yet supported. The tree, precision, tree_levels, distance_error_pct, and points_only parameters are deprecated. * TESTS:Debug Log. IndexStatsIT#testFilterCacheStats * ingest: support default pipelines + bulk upserts (#36618) This commit adds support to enable bulk upserts to use an index's default pipeline. Bulk upsert, doc_as_upsert, and script_as_upsert are all supported. However, bulk script_as_upsert has slightly surprising behavior since the pipeline is executed _before_ the script is evaluated. This means that the pipeline only has access the data found in the upsert field of the script_as_upsert. The non-bulk script_as_upsert (existing behavior) runs the pipeline _after_ the script is executed. This commit does _not_ attempt to consolidate the bulk and non-bulk behavior for script_as_upsert. This commit also adds additional testing for the non-bulk behavior, which remains unchanged with this commit. fixes #36219 * Fix duplicate phrase in shrink/split error message (#36734) This commit removes a duplicate "must be a" from the shrink/split error messages. * Deprecate types in get_source and exist_source (#36426) This change adds a new untyped endpoint `{index}/_source/{id}` for both the GET and the HEAD methods to get the source of a document or check for its existance. It also adds deprecation warnings to RestGetSourceAction that emit a warning when the old deprecated "type" parameter is still used. Also updating documentation and tests where appropriate. Relates to #35190 * Revert "[Geo] Integrate Lucene's LatLonShape (BKD Backed GeoShapes) as default `geo_shape` indexing approach (#35320)" This reverts commit 5bc7822. * Enhance Invalidate Token API (#35388) This change: - Adds functionality to invalidate all (refresh+access) tokens for all users of a realm - Adds functionality to invalidate all (refresh+access)tokens for a user in all realms - Adds functionality to invalidate all (refresh+access) tokens for a user in a specific realm - Changes the response format for the invalidate token API to contain information about the number of the invalidated tokens and possible errors that were encountered. - Updates the API Documentation After back-porting to 6.x, the `created` field will be removed from master as a field in the response Resolves: #35115 Relates: #34556 * Add raw sort values to SearchSortValues transport serialization (#36617) In order for CCS alternate execution mode (see #32125) to be able to do the final reduction step on the CCS coordinating node, we need to serialize additional info in the transport layer as part of each `SearchHit`. Sort values are already present but they are formatted according to the provided `DocValueFormat` provided. The CCS node needs to be able to reconstruct the lucene `FieldDoc` to include in the `TopFieldDocs` and `CollapseTopFieldDocs` which will feed the `mergeTopDocs` method used to reduce multiple search responses (one per cluster) into one. This commit adds such information to the `SearchSortValues` and exposes it through a new getter method added to `SearchHit` for retrieval. This info is only serialized at transport and never printed out at REST. * Watcher: Ensure all internal search requests count hits (#36697) In previous commits only the stored toXContent version of a search request was using the old format. However an executed search request was already disabling hit counts. In 7.0 hit counts will stay enabled by default to allow for proper migration. Closes #36177 * [TEST] Ensure shard follow tasks have really stopped. Relates to #36696 * Ensure MapperService#getAllMetaFields elements order is deterministic (#36739) MapperService#getAllMetaFields returns an array, which is created out of an `ObjectHashSet`. Such set does not guarantee deterministic hash ordering. The array returned by its toArray may be sorted differently at each run. This caused some repeatability issues in our tests (see #29080) as we pick random fields from the array of possible metadata fields, but that won't be repeatable if the input array is sorted differently at every run. Once setting the tests seed, hppc picks that up and the sorting is deterministic, but failures don't repeat with the seed that gets printed out originally (as a seed was not originally set). See also https://issues.carrot2.org/projects/HPPC/issues/HPPC-173. With this commit, we simply create a static sorted array that is used for `getAllMetaFields`. The change is in production code but really affects only testing as the only production usage of this method was to iterate through all values when parsing fields in the high-level REST client code. Anyways, this seems like a good change as returning an array would imply that it's deterministically sorted. * Expose Sequence Number based Optimistic Concurrency Control in the rest layer (#36721) Relates #36148 Relates #10708 * [ML] Mute MlDistributedFailureIT
1 parent ea9b08d commit e294056

File tree

149 files changed

+9341
-3081
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

149 files changed

+9341
-3081
lines changed

client/rest-high-level/src/main/java/org/elasticsearch/client/ml/job/config/Job.java

Lines changed: 3 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,6 @@ public class Job implements ToXContentObject {
5959
public static final ParseField DATA_DESCRIPTION = new ParseField("data_description");
6060
public static final ParseField DESCRIPTION = new ParseField("description");
6161
public static final ParseField FINISHED_TIME = new ParseField("finished_time");
62-
public static final ParseField ESTABLISHED_MODEL_MEMORY = new ParseField("established_model_memory");
6362
public static final ParseField MODEL_PLOT_CONFIG = new ParseField("model_plot_config");
6463
public static final ParseField RENORMALIZATION_WINDOW_DAYS = new ParseField("renormalization_window_days");
6564
public static final ParseField BACKGROUND_PERSIST_INTERVAL = new ParseField("background_persist_interval");
@@ -84,7 +83,6 @@ public class Job implements ToXContentObject {
8483
(p) -> TimeUtil.parseTimeField(p, FINISHED_TIME.getPreferredName()),
8584
FINISHED_TIME,
8685
ValueType.VALUE);
87-
PARSER.declareLong(Builder::setEstablishedModelMemory, ESTABLISHED_MODEL_MEMORY);
8886
PARSER.declareObject(Builder::setAnalysisConfig, AnalysisConfig.PARSER, ANALYSIS_CONFIG);
8987
PARSER.declareObject(Builder::setAnalysisLimits, AnalysisLimits.PARSER, ANALYSIS_LIMITS);
9088
PARSER.declareObject(Builder::setDataDescription, DataDescription.PARSER, DATA_DESCRIPTION);
@@ -107,7 +105,6 @@ public class Job implements ToXContentObject {
107105
private final String description;
108106
private final Date createTime;
109107
private final Date finishedTime;
110-
private final Long establishedModelMemory;
111108
private final AnalysisConfig analysisConfig;
112109
private final AnalysisLimits analysisLimits;
113110
private final DataDescription dataDescription;
@@ -122,7 +119,7 @@ public class Job implements ToXContentObject {
122119
private final Boolean deleting;
123120

124121
private Job(String jobId, String jobType, List<String> groups, String description,
125-
Date createTime, Date finishedTime, Long establishedModelMemory,
122+
Date createTime, Date finishedTime,
126123
AnalysisConfig analysisConfig, AnalysisLimits analysisLimits, DataDescription dataDescription,
127124
ModelPlotConfig modelPlotConfig, Long renormalizationWindowDays, TimeValue backgroundPersistInterval,
128125
Long modelSnapshotRetentionDays, Long resultsRetentionDays, Map<String, Object> customSettings,
@@ -134,7 +131,6 @@ private Job(String jobId, String jobType, List<String> groups, String descriptio
134131
this.description = description;
135132
this.createTime = createTime;
136133
this.finishedTime = finishedTime;
137-
this.establishedModelMemory = establishedModelMemory;
138134
this.analysisConfig = analysisConfig;
139135
this.analysisLimits = analysisLimits;
140136
this.dataDescription = dataDescription;
@@ -204,16 +200,6 @@ public Date getFinishedTime() {
204200
return finishedTime;
205201
}
206202

207-
/**
208-
* The established model memory of the job, or <code>null</code> if model
209-
* memory has not reached equilibrium yet.
210-
*
211-
* @return The established model memory of the job
212-
*/
213-
public Long getEstablishedModelMemory() {
214-
return establishedModelMemory;
215-
}
216-
217203
/**
218204
* The analysis configuration object
219205
*
@@ -306,9 +292,6 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
306292
builder.timeField(FINISHED_TIME.getPreferredName(), FINISHED_TIME.getPreferredName() + humanReadableSuffix,
307293
finishedTime.getTime());
308294
}
309-
if (establishedModelMemory != null) {
310-
builder.field(ESTABLISHED_MODEL_MEMORY.getPreferredName(), establishedModelMemory);
311-
}
312295
builder.field(ANALYSIS_CONFIG.getPreferredName(), analysisConfig, params);
313296
if (analysisLimits != null) {
314297
builder.field(ANALYSIS_LIMITS.getPreferredName(), analysisLimits, params);
@@ -364,7 +347,6 @@ public boolean equals(Object other) {
364347
&& Objects.equals(this.description, that.description)
365348
&& Objects.equals(this.createTime, that.createTime)
366349
&& Objects.equals(this.finishedTime, that.finishedTime)
367-
&& Objects.equals(this.establishedModelMemory, that.establishedModelMemory)
368350
&& Objects.equals(this.analysisConfig, that.analysisConfig)
369351
&& Objects.equals(this.analysisLimits, that.analysisLimits)
370352
&& Objects.equals(this.dataDescription, that.dataDescription)
@@ -381,7 +363,7 @@ public boolean equals(Object other) {
381363

382364
@Override
383365
public int hashCode() {
384-
return Objects.hash(jobId, jobType, groups, description, createTime, finishedTime, establishedModelMemory,
366+
return Objects.hash(jobId, jobType, groups, description, createTime, finishedTime,
385367
analysisConfig, analysisLimits, dataDescription, modelPlotConfig, renormalizationWindowDays,
386368
backgroundPersistInterval, modelSnapshotRetentionDays, resultsRetentionDays, customSettings,
387369
modelSnapshotId, resultsIndexName, deleting);
@@ -407,7 +389,6 @@ public static class Builder {
407389
private DataDescription dataDescription;
408390
private Date createTime;
409391
private Date finishedTime;
410-
private Long establishedModelMemory;
411392
private ModelPlotConfig modelPlotConfig;
412393
private Long renormalizationWindowDays;
413394
private TimeValue backgroundPersistInterval;
@@ -435,7 +416,6 @@ public Builder(Job job) {
435416
this.dataDescription = job.getDataDescription();
436417
this.createTime = job.getCreateTime();
437418
this.finishedTime = job.getFinishedTime();
438-
this.establishedModelMemory = job.getEstablishedModelMemory();
439419
this.modelPlotConfig = job.getModelPlotConfig();
440420
this.renormalizationWindowDays = job.getRenormalizationWindowDays();
441421
this.backgroundPersistInterval = job.getBackgroundPersistInterval();
@@ -496,11 +476,6 @@ Builder setFinishedTime(Date finishedTime) {
496476
return this;
497477
}
498478

499-
public Builder setEstablishedModelMemory(Long establishedModelMemory) {
500-
this.establishedModelMemory = establishedModelMemory;
501-
return this;
502-
}
503-
504479
public Builder setDataDescription(DataDescription.Builder description) {
505480
dataDescription = Objects.requireNonNull(description, DATA_DESCRIPTION.getPreferredName()).build();
506481
return this;
@@ -555,7 +530,7 @@ public Job build() {
555530
Objects.requireNonNull(id, "[" + ID.getPreferredName() + "] must not be null");
556531
Objects.requireNonNull(jobType, "[" + JOB_TYPE.getPreferredName() + "] must not be null");
557532
return new Job(
558-
id, jobType, groups, description, createTime, finishedTime, establishedModelMemory,
533+
id, jobType, groups, description, createTime, finishedTime,
559534
analysisConfig, analysisLimits, dataDescription, modelPlotConfig, renormalizationWindowDays,
560535
backgroundPersistInterval, modelSnapshotRetentionDays, resultsRetentionDays, customSettings,
561536
modelSnapshotId, resultsIndexName, deleting);

client/rest-high-level/src/test/java/org/elasticsearch/client/ml/job/config/JobTests.java

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -125,9 +125,6 @@ public static Job.Builder createRandomizedJobBuilder() {
125125
if (randomBoolean()) {
126126
builder.setFinishedTime(new Date(randomNonNegativeLong()));
127127
}
128-
if (randomBoolean()) {
129-
builder.setEstablishedModelMemory(randomNonNegativeLong());
130-
}
131128
builder.setAnalysisConfig(AnalysisConfigTests.createRandomized());
132129
builder.setAnalysisLimits(AnalysisLimitsTests.createRandomized());
133130

docs/reference/ml/apis/jobresource.asciidoc

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -42,11 +42,6 @@ so do not set the `background_persist_interval` value too low.
4242
`description`::
4343
(string) An optional description of the job.
4444

45-
`established_model_memory`::
46-
(long) The approximate amount of memory resources that have been used for
47-
analytical processing. This field is present only when the analytics have used
48-
a stable amount of memory for several consecutive buckets.
49-
5045
`finished_time`::
5146
(string) If the job closed or failed, this is the time the job finished,
5247
otherwise it is `null`. This property is informational; you cannot change its

x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/XPackClientPlugin.java

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,7 @@
6565
import org.elasticsearch.xpack.core.logstash.LogstashFeatureSetUsage;
6666
import org.elasticsearch.xpack.core.ml.MachineLearningFeatureSetUsage;
6767
import org.elasticsearch.xpack.core.ml.MlMetadata;
68+
import org.elasticsearch.xpack.core.ml.MlTasks;
6869
import org.elasticsearch.xpack.core.ml.action.CloseJobAction;
6970
import org.elasticsearch.xpack.core.ml.action.DeleteCalendarAction;
7071
import org.elasticsearch.xpack.core.ml.action.DeleteCalendarEventAction;
@@ -363,9 +364,9 @@ public List<NamedWriteableRegistry.Entry> getNamedWriteables() {
363364
new NamedWriteableRegistry.Entry(MetaData.Custom.class, "ml", MlMetadata::new),
364365
new NamedWriteableRegistry.Entry(NamedDiff.class, "ml", MlMetadata.MlMetadataDiff::new),
365366
// ML - Persistent action requests
366-
new NamedWriteableRegistry.Entry(PersistentTaskParams.class, StartDatafeedAction.TASK_NAME,
367+
new NamedWriteableRegistry.Entry(PersistentTaskParams.class, MlTasks.DATAFEED_TASK_NAME,
367368
StartDatafeedAction.DatafeedParams::new),
368-
new NamedWriteableRegistry.Entry(PersistentTaskParams.class, OpenJobAction.TASK_NAME,
369+
new NamedWriteableRegistry.Entry(PersistentTaskParams.class, MlTasks.JOB_TASK_NAME,
369370
OpenJobAction.JobParams::new),
370371
// ML - Task states
371372
new NamedWriteableRegistry.Entry(PersistentTaskState.class, JobTaskState.NAME, JobTaskState::new),
@@ -433,9 +434,9 @@ public List<NamedXContentRegistry.Entry> getNamedXContent() {
433434
new NamedXContentRegistry.Entry(MetaData.Custom.class, new ParseField("ml"),
434435
parser -> MlMetadata.LENIENT_PARSER.parse(parser, null).build()),
435436
// ML - Persistent action requests
436-
new NamedXContentRegistry.Entry(PersistentTaskParams.class, new ParseField(StartDatafeedAction.TASK_NAME),
437+
new NamedXContentRegistry.Entry(PersistentTaskParams.class, new ParseField(MlTasks.DATAFEED_TASK_NAME),
437438
StartDatafeedAction.DatafeedParams::fromXContent),
438-
new NamedXContentRegistry.Entry(PersistentTaskParams.class, new ParseField(OpenJobAction.TASK_NAME),
439+
new NamedXContentRegistry.Entry(PersistentTaskParams.class, new ParseField(MlTasks.JOB_TASK_NAME),
439440
OpenJobAction.JobParams::fromXContent),
440441
// ML - Task states
441442
new NamedXContentRegistry.Entry(PersistentTaskState.class, new ParseField(DatafeedState.NAME), DatafeedState::fromXContent),

x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/MlMetaIndex.java

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,8 +21,6 @@ public final class MlMetaIndex {
2121
*/
2222
public static final String INDEX_NAME = ".ml-meta";
2323

24-
public static final String INCLUDE_TYPE_KEY = "include_type";
25-
2624
public static final String TYPE = "doc";
2725

2826
private MlMetaIndex() {}

x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/MlMetadata.java

Lines changed: 11 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,6 @@
55
*/
66
package org.elasticsearch.xpack.core.ml;
77

8-
import org.elasticsearch.ResourceAlreadyExistsException;
98
import org.elasticsearch.ResourceNotFoundException;
109
import org.elasticsearch.Version;
1110
import org.elasticsearch.cluster.AbstractDiffable;
@@ -146,7 +145,6 @@ public MlMetadata(StreamInput in) throws IOException {
146145
datafeeds.put(in.readString(), new DatafeedConfig(in));
147146
}
148147
this.datafeeds = datafeeds;
149-
150148
this.groupOrJobLookup = new GroupOrJobLookup(jobs.values());
151149
}
152150

@@ -167,7 +165,7 @@ private static <T extends Writeable> void writeMap(Map<String, T> map, StreamOut
167165
@Override
168166
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
169167
DelegatingMapParams extendedParams =
170-
new DelegatingMapParams(Collections.singletonMap(ToXContentParams.FOR_CLUSTER_STATE, "true"), params);
168+
new DelegatingMapParams(Collections.singletonMap(ToXContentParams.FOR_INTERNAL_STORAGE, "true"), params);
171169
mapValuesToXContent(JOBS_FIELD, jobs, builder, extendedParams);
172170
mapValuesToXContent(DATAFEEDS_FIELD, datafeeds, builder, extendedParams);
173171
return builder;
@@ -196,9 +194,14 @@ public MlMetadataDiff(StreamInput in) throws IOException {
196194
this.jobs = DiffableUtils.readJdkMapDiff(in, DiffableUtils.getStringKeySerializer(), Job::new,
197195
MlMetadataDiff::readJobDiffFrom);
198196
this.datafeeds = DiffableUtils.readJdkMapDiff(in, DiffableUtils.getStringKeySerializer(), DatafeedConfig::new,
199-
MlMetadataDiff::readSchedulerDiffFrom);
197+
MlMetadataDiff::readDatafeedDiffFrom);
200198
}
201199

200+
/**
201+
* Merge the diff with the ML metadata.
202+
* @param part The current ML metadata.
203+
* @return The new ML metadata.
204+
*/
202205
@Override
203206
public MetaData.Custom apply(MetaData.Custom part) {
204207
TreeMap<String, Job> newJobs = new TreeMap<>(jobs.apply(((MlMetadata) part).jobs));
@@ -221,7 +224,7 @@ static Diff<Job> readJobDiffFrom(StreamInput in) throws IOException {
221224
return AbstractDiffable.readDiffFrom(Job::new, in);
222225
}
223226

224-
static Diff<DatafeedConfig> readSchedulerDiffFrom(StreamInput in) throws IOException {
227+
static Diff<DatafeedConfig> readDatafeedDiffFrom(StreamInput in) throws IOException {
225228
return AbstractDiffable.readDiffFrom(DatafeedConfig::new, in);
226229
}
227230
}
@@ -295,7 +298,7 @@ public Builder deleteJob(String jobId, PersistentTasksCustomMetaData tasks) {
295298

296299
public Builder putDatafeed(DatafeedConfig datafeedConfig, Map<String, String> headers) {
297300
if (datafeeds.containsKey(datafeedConfig.getId())) {
298-
throw new ResourceAlreadyExistsException("A datafeed with id [" + datafeedConfig.getId() + "] already exists");
301+
throw ExceptionsHelper.datafeedAlreadyExists(datafeedConfig.getId());
299302
}
300303
String jobId = datafeedConfig.getJobId();
301304
checkJobIsAvailableForDatafeed(jobId);
@@ -369,14 +372,14 @@ private void checkDatafeedIsStopped(Supplier<String> msg, String datafeedId, Per
369372
}
370373
}
371374

372-
private Builder putJobs(Collection<Job> jobs) {
375+
public Builder putJobs(Collection<Job> jobs) {
373376
for (Job job : jobs) {
374377
putJob(job, true);
375378
}
376379
return this;
377380
}
378381

379-
private Builder putDatafeeds(Collection<DatafeedConfig> datafeeds) {
382+
public Builder putDatafeeds(Collection<DatafeedConfig> datafeeds) {
380383
for (DatafeedConfig datafeed : datafeeds) {
381384
this.datafeeds.put(datafeed.getId(), datafeed);
382385
}
@@ -421,8 +424,6 @@ void checkJobHasNoDatafeed(String jobId) {
421424
}
422425
}
423426

424-
425-
426427
public static MlMetadata getMlMetadata(ClusterState state) {
427428
MlMetadata mlMetadata = (state == null) ? null : state.getMetaData().custom(TYPE);
428429
if (mlMetadata == null) {

x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/MlTasks.java

Lines changed: 73 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -12,8 +12,19 @@
1212
import org.elasticsearch.xpack.core.ml.job.config.JobState;
1313
import org.elasticsearch.xpack.core.ml.job.config.JobTaskState;
1414

15+
import java.util.Collections;
16+
import java.util.List;
17+
import java.util.Set;
18+
import java.util.stream.Collectors;
19+
1520
public final class MlTasks {
1621

22+
public static final String JOB_TASK_NAME = "xpack/ml/job";
23+
public static final String DATAFEED_TASK_NAME = "xpack/ml/datafeed";
24+
25+
private static final String JOB_TASK_ID_PREFIX = "job-";
26+
private static final String DATAFEED_TASK_ID_PREFIX = "datafeed-";
27+
1728
private MlTasks() {
1829
}
1930

@@ -22,15 +33,15 @@ private MlTasks() {
2233
* A datafeed id can be used as a job id, because they are stored separately in cluster state.
2334
*/
2435
public static String jobTaskId(String jobId) {
25-
return "job-" + jobId;
36+
return JOB_TASK_ID_PREFIX + jobId;
2637
}
2738

2839
/**
2940
* Namespaces the task ids for datafeeds.
3041
* A job id can be used as a datafeed id, because they are stored separately in cluster state.
3142
*/
3243
public static String datafeedTaskId(String datafeedId) {
33-
return "datafeed-" + datafeedId;
44+
return DATAFEED_TASK_ID_PREFIX + datafeedId;
3445
}
3546

3647
@Nullable
@@ -67,4 +78,64 @@ public static DatafeedState getDatafeedState(String datafeedId, @Nullable Persis
6778
return DatafeedState.STOPPED;
6879
}
6980
}
81+
82+
/**
83+
* The job Ids of anomaly detector job tasks.
84+
* All anomaly detector jobs are returned regardless of the status of the
85+
* task (OPEN, CLOSED, FAILED etc).
86+
*
87+
* @param tasks Persistent tasks. If null an empty set is returned.
88+
* @return The job Ids of anomaly detector job tasks
89+
*/
90+
public static Set<String> openJobIds(@Nullable PersistentTasksCustomMetaData tasks) {
91+
if (tasks == null) {
92+
return Collections.emptySet();
93+
}
94+
95+
return tasks.findTasks(JOB_TASK_NAME, task -> true)
96+
.stream()
97+
.map(t -> t.getId().substring(JOB_TASK_ID_PREFIX.length()))
98+
.collect(Collectors.toSet());
99+
}
100+
101+
/**
102+
* The datafeed Ids of started datafeed tasks
103+
*
104+
* @param tasks Persistent tasks. If null an empty set is returned.
105+
* @return The Ids of running datafeed tasks
106+
*/
107+
public static Set<String> startedDatafeedIds(@Nullable PersistentTasksCustomMetaData tasks) {
108+
if (tasks == null) {
109+
return Collections.emptySet();
110+
}
111+
112+
return tasks.findTasks(DATAFEED_TASK_NAME, task -> true)
113+
.stream()
114+
.map(t -> t.getId().substring(DATAFEED_TASK_ID_PREFIX.length()))
115+
.collect(Collectors.toSet());
116+
}
117+
118+
/**
119+
* Is there an ml anomaly detector job task for the job {@code jobId}?
120+
* @param jobId The job id
121+
* @param tasks Persistent tasks
122+
* @return True if the job has a task
123+
*/
124+
public static boolean taskExistsForJob(String jobId, PersistentTasksCustomMetaData tasks) {
125+
return openJobIds(tasks).contains(jobId);
126+
}
127+
128+
/**
129+
* Read the active anomaly detector job tasks.
130+
* Active tasks are not {@code JobState.CLOSED} or {@code JobState.FAILED}.
131+
*
132+
* @param tasks Persistent tasks
133+
* @return The job tasks excluding closed and failed jobs
134+
*/
135+
public static List<PersistentTasksCustomMetaData.PersistentTask<?>> activeJobTasks(PersistentTasksCustomMetaData tasks) {
136+
return tasks.findTasks(JOB_TASK_NAME, task -> true)
137+
.stream()
138+
.filter(task -> ((JobTaskState) task.getState()).getState().isAnyOf(JobState.CLOSED, JobState.FAILED) == false)
139+
.collect(Collectors.toList());
140+
}
70141
}

0 commit comments

Comments
 (0)