Skip to content

Commit 7a74677

Browse files
committed
[ML] Tighten up use of aliases rather than concrete indices (#37874)
We have read and write aliases for the ML results indices. However, the job still had methods that purported to reliably return the name of the concrete results index being used by the job. After reindexing prior to upgrade to 7.x this will be wrong, so the method has been renamed and the comments made more explicit to say the returned index name may not be the actual concrete index name for the lifetime of the job. Additionally, the selection of indices when deleting the job has been changed so that it works regardless of concrete index names. All these changes are nice-to-have for 6.7 and 7.0, but will become critical if we add rolling results indices in the 7.x release stream as 6.7 and 7.0 nodes may have to operate in a mixed version cluster that includes a version that can roll results indices.
1 parent b38ee97 commit 7a74677

File tree

7 files changed

+84
-43
lines changed

7 files changed

+84
-43
lines changed

x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/config/Job.java

Lines changed: 13 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -266,18 +266,24 @@ public List<String> getGroups() {
266266
}
267267

268268
/**
269-
* The name of the index storing the job's results and state.
270-
* This defaults to {@link #getId()} if a specific index name is not set.
271-
* @return The job's index name
269+
* A good starting name for the index storing the job's results.
270+
* This defaults to the shared results index if a specific index name is not set.
271+
* This method must <em>only</em> be used during initial job creation.
272+
* After that the read/write aliases must always be used to access the job's
273+
* results index, as the underlying index may roll or be reindexed.
274+
* @return The job's initial results index name
272275
*/
273-
public String getResultsIndexName() {
276+
public String getInitialResultsIndexName() {
274277
return AnomalyDetectorsIndexFields.RESULTS_INDEX_PREFIX + resultsIndexName;
275278
}
276279

277280
/**
278-
* Private version of getResultsIndexName so that a job can be built from another
279-
* job and pass index name validation
280-
* @return The job's index name, minus prefix
281+
* Get the unmodified <code>results_index_name</code> field from the job.
282+
* This is provided to allow a job to be copied via the builder.
283+
* After creation this does not necessarily reflect the actual concrete
284+
* index used by the job. A job's results must always be read and written
285+
* using the read and write aliases.
286+
* @return The job's configured "index name"
281287
*/
282288
private String getResultsIndexNameNoPrefix() {
283289
return resultsIndexName;

x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/job/config/JobTests.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -418,14 +418,14 @@ public void testBuilder_setsDefaultIndexName() {
418418
Job.Builder builder = buildJobBuilder("foo");
419419
Job job = builder.build();
420420
assertEquals(AnomalyDetectorsIndexFields.RESULTS_INDEX_PREFIX + AnomalyDetectorsIndexFields.RESULTS_INDEX_DEFAULT,
421-
job.getResultsIndexName());
421+
job.getInitialResultsIndexName());
422422
}
423423

424424
public void testBuilder_setsIndexName() {
425425
Job.Builder builder = buildJobBuilder("foo");
426426
builder.setResultsIndexName("carol");
427427
Job job = builder.build();
428-
assertEquals(AnomalyDetectorsIndexFields.RESULTS_INDEX_PREFIX + "custom-carol", job.getResultsIndexName());
428+
assertEquals(AnomalyDetectorsIndexFields.RESULTS_INDEX_PREFIX + "custom-carol", job.getInitialResultsIndexName());
429429
}
430430

431431
public void testBuilder_withInvalidIndexNameThrows() {

x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/MlJobIT.java

Lines changed: 26 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
import org.elasticsearch.test.SecuritySettingsSourceField;
1818
import org.elasticsearch.test.rest.ESRestTestCase;
1919
import org.elasticsearch.xpack.core.ml.integration.MlRestTestStateCleaner;
20+
import org.elasticsearch.xpack.core.ml.job.config.Job;
2021
import org.elasticsearch.xpack.core.ml.job.persistence.AnomalyDetectorsIndex;
2122
import org.elasticsearch.xpack.core.ml.job.persistence.AnomalyDetectorsIndexFields;
2223
import org.elasticsearch.xpack.ml.MachineLearning;
@@ -57,7 +58,7 @@ public void testPutJob_GivenFarequoteConfig() throws Exception {
5758
assertThat(responseAsString, containsString("\"job_id\":\"given-farequote-config-job\""));
5859
}
5960

60-
public void testGetJob_GivenNoSuchJob() throws Exception {
61+
public void testGetJob_GivenNoSuchJob() {
6162
ResponseException e = expectThrows(ResponseException.class, () ->
6263
client().performRequest(new Request("GET", MachineLearning.BASE_PATH + "anomaly_detectors/non-existing-job/_stats")));
6364

@@ -519,8 +520,30 @@ public void testMultiIndexDelete() throws Exception {
519520
String indexName = AnomalyDetectorsIndexFields.RESULTS_INDEX_PREFIX + AnomalyDetectorsIndexFields.RESULTS_INDEX_DEFAULT;
520521
createFarequoteJob(jobId);
521522

522-
client().performRequest(new Request("PUT", indexName + "-001"));
523-
client().performRequest(new Request("PUT", indexName + "-002"));
523+
// Make the job's results span an extra two indices, i.e. three in total.
524+
// To do this the job's results alias needs to encompass all three indices.
525+
Request extraIndex1 = new Request("PUT", indexName + "-001");
526+
extraIndex1.setJsonEntity("{\n" +
527+
" \"aliases\" : {\n" +
528+
" \"" + AnomalyDetectorsIndex.jobResultsAliasedName(jobId)+ "\" : {\n" +
529+
" \"filter\" : {\n" +
530+
" \"term\" : {\"" + Job.ID + "\" : \"" + jobId + "\" }\n" +
531+
" }\n" +
532+
" }\n" +
533+
" }\n" +
534+
"}");
535+
client().performRequest(extraIndex1);
536+
Request extraIndex2 = new Request("PUT", indexName + "-002");
537+
extraIndex2.setJsonEntity("{\n" +
538+
" \"aliases\" : {\n" +
539+
" \"" + AnomalyDetectorsIndex.jobResultsAliasedName(jobId)+ "\" : {\n" +
540+
" \"filter\" : {\n" +
541+
" \"term\" : {\"" + Job.ID + "\" : \"" + jobId + "\" }\n" +
542+
" }\n" +
543+
" }\n" +
544+
" }\n" +
545+
"}");
546+
client().performRequest(extraIndex2);
524547

525548
String indicesBeforeDelete = EntityUtils.toString(client().performRequest(new Request("GET", "/_cat/indices")).getEntity());
526549
assertThat(indicesBeforeDelete, containsString(indexName));

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportDeleteJobAction.java

Lines changed: 35 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -266,26 +266,25 @@ private void normalDeleteJob(ParentTaskAssigningClient parentTaskClient, DeleteJ
266266
private void deleteJobDocuments(ParentTaskAssigningClient parentTaskClient, String jobId,
267267
CheckedConsumer<Boolean, Exception> finishedHandler, Consumer<Exception> failureHandler) {
268268

269-
AtomicReference<String> indexName = new AtomicReference<>();
269+
AtomicReference<String[]> indexNames = new AtomicReference<>();
270270

271271
final ActionListener<AcknowledgedResponse> completionHandler = ActionListener.wrap(
272272
response -> finishedHandler.accept(response.isAcknowledged()),
273273
failureHandler);
274274

275-
// Step 8. If we did not drop the index and after DBQ state done, we delete the aliases
275+
// Step 8. If we did not drop the indices and after DBQ state done, we delete the aliases
276276
ActionListener<BulkByScrollResponse> dbqHandler = ActionListener.wrap(
277277
bulkByScrollResponse -> {
278-
if (bulkByScrollResponse == null) { // no action was taken by DBQ, assume Index was deleted
278+
if (bulkByScrollResponse == null) { // no action was taken by DBQ, assume indices were deleted
279279
completionHandler.onResponse(new AcknowledgedResponse(true));
280280
} else {
281281
if (bulkByScrollResponse.isTimedOut()) {
282-
logger.warn("[{}] DeleteByQuery for indices [{}, {}] timed out.", jobId, indexName.get(),
283-
indexName.get() + "-*");
282+
logger.warn("[{}] DeleteByQuery for indices [{}] timed out.", jobId, String.join(", ", indexNames.get()));
284283
}
285284
if (!bulkByScrollResponse.getBulkFailures().isEmpty()) {
286-
logger.warn("[{}] {} failures and {} conflicts encountered while running DeleteByQuery on indices [{}, {}].",
285+
logger.warn("[{}] {} failures and {} conflicts encountered while running DeleteByQuery on indices [{}].",
287286
jobId, bulkByScrollResponse.getBulkFailures().size(), bulkByScrollResponse.getVersionConflicts(),
288-
indexName.get(), indexName.get() + "-*");
287+
String.join(", ", indexNames.get()));
289288
for (BulkItemResponse.Failure failure : bulkByScrollResponse.getBulkFailures()) {
290289
logger.warn("DBQ failure: " + failure);
291290
}
@@ -295,13 +294,12 @@ private void deleteJobDocuments(ParentTaskAssigningClient parentTaskClient, Stri
295294
},
296295
failureHandler);
297296

298-
// Step 7. If we did not delete the index, we run a delete by query
297+
// Step 7. If we did not delete the indices, we run a delete by query
299298
ActionListener<Boolean> deleteByQueryExecutor = ActionListener.wrap(
300299
response -> {
301-
if (response) {
302-
String indexPattern = indexName.get() + "-*";
303-
logger.info("Running DBQ on [" + indexName.get() + "," + indexPattern + "] for job [" + jobId + "]");
304-
DeleteByQueryRequest request = new DeleteByQueryRequest(indexName.get(), indexPattern);
300+
if (response && indexNames.get().length > 0) {
301+
logger.info("Running DBQ on [" + String.join(", ", indexNames.get()) + "] for job [" + jobId + "]");
302+
DeleteByQueryRequest request = new DeleteByQueryRequest(indexNames.get());
305303
ConstantScoreQueryBuilder query =
306304
new ConstantScoreQueryBuilder(new TermQueryBuilder(Job.ID.getPreferredName(), jobId));
307305
request.setQuery(query);
@@ -317,15 +315,15 @@ private void deleteJobDocuments(ParentTaskAssigningClient parentTaskClient, Stri
317315
},
318316
failureHandler);
319317

320-
// Step 6. If we have any hits, that means we are NOT the only job on this index, and should not delete it
321-
// if we do not have any hits, we can drop the index and then skip the DBQ and alias deletion
318+
// Step 6. If we have any hits, that means we are NOT the only job on these indices, and should not delete the indices.
319+
// If we do not have any hits, we can drop the indices and then skip the DBQ and alias deletion.
322320
ActionListener<SearchResponse> customIndexSearchHandler = ActionListener.wrap(
323321
searchResponse -> {
324322
if (searchResponse == null || searchResponse.getHits().totalHits > 0) {
325323
deleteByQueryExecutor.onResponse(true); // We need to run DBQ and alias deletion
326324
} else {
327-
logger.info("Running DELETE Index on [" + indexName.get() + "] for job [" + jobId + "]");
328-
DeleteIndexRequest request = new DeleteIndexRequest(indexName.get());
325+
logger.info("Running DELETE Index on [" + String.join(", ", indexNames.get()) + "] for job [" + jobId + "]");
326+
DeleteIndexRequest request = new DeleteIndexRequest(indexNames.get());
329327
request.indicesOptions(IndicesOptions.lenientExpandOpen());
330328
// If we have deleted the index, then we don't need to delete the aliases or run the DBQ
331329
executeAsyncWithOrigin(
@@ -347,29 +345,43 @@ private void deleteJobDocuments(ParentTaskAssigningClient parentTaskClient, Stri
347345
}
348346
);
349347

350-
// Step 5. Determine if we are on a shared index by looking at `.ml-anomalies-shared` or the custom index's aliases
348+
// Step 5. Determine if we are on shared indices by looking at whether the initial index was ".ml-anomalies-shared"
349+
// or whether the indices that the job's results alias points to contain any documents from other jobs.
350+
// TODO: this check is currently assuming that a job's results indices are either ALL shared or ALL
351+
// dedicated to the job. We have considered functionality like rolling jobs that generate large
352+
// volumes of results from shared to dedicated indices. On deletion such a job would have a mix of
353+
// shared indices requiring DBQ and dedicated indices that could be simply dropped. The current
354+
// functionality would apply DBQ to all these indices, which is safe but suboptimal. So this functionality
355+
// should be revisited when we add rolling results index functionality, especially if we add the ability
356+
// to switch a job over to a dedicated index for future results.
351357
ActionListener<Job> getJobHandler = ActionListener.wrap(
352358
job -> {
353-
indexName.set(job.getResultsIndexName());
354-
if (indexName.get().equals(AnomalyDetectorsIndexFields.RESULTS_INDEX_PREFIX +
355-
AnomalyDetectorsIndexFields.RESULTS_INDEX_DEFAULT)) {
356-
//don't bother searching the index any further, we are on the default shared
359+
indexNames.set(indexNameExpressionResolver.concreteIndexNames(clusterService.state(),
360+
IndicesOptions.lenientExpandOpen(), AnomalyDetectorsIndex.jobResultsAliasedName(jobId)));
361+
// The job may no longer be using the initial shared index, but if it started off on a
362+
// shared index then it will still be on a shared index even if it's been reindexed
363+
if (job.getInitialResultsIndexName()
364+
.equals(AnomalyDetectorsIndexFields.RESULTS_INDEX_PREFIX + AnomalyDetectorsIndexFields.RESULTS_INDEX_DEFAULT)) {
365+
// don't bother searching the index any further, we are on the default shared
366+
customIndexSearchHandler.onResponse(null);
367+
} else if (indexNames.get().length == 0) {
368+
// don't bother searching the index any further - it's already been closed or deleted
357369
customIndexSearchHandler.onResponse(null);
358370
} else {
359371
SearchSourceBuilder source = new SearchSourceBuilder()
360372
.size(1)
361373
.query(QueryBuilders.boolQuery().filter(
362374
QueryBuilders.boolQuery().mustNot(QueryBuilders.termQuery(Job.ID.getPreferredName(), jobId))));
363375

364-
SearchRequest searchRequest = new SearchRequest(indexName.get());
376+
SearchRequest searchRequest = new SearchRequest(indexNames.get());
365377
searchRequest.source(source);
366378
executeAsyncWithOrigin(parentTaskClient, ML_ORIGIN, SearchAction.INSTANCE, searchRequest, customIndexSearchHandler);
367379
}
368380
},
369381
failureHandler
370382
);
371383

372-
// Step 4. Get the job as the result index name is required
384+
// Step 4. Get the job as the initial result index name is required
373385
ActionListener<Boolean> deleteCategorizerStateHandler = ActionListener.wrap(
374386
response -> {
375387
jobManager.getJob(jobId, getJobHandler);

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportOpenJobAction.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -152,9 +152,8 @@ static PersistentTasksCustomMetaData.Assignment selectLeastLoadedMlNode(String j
152152
int maxMachineMemoryPercent,
153153
MlMemoryTracker memoryTracker,
154154
Logger logger) {
155-
String resultsIndexName = job.getResultsIndexName();
156-
157-
List<String> unavailableIndices = verifyIndicesPrimaryShardsAreActive(resultsIndexName, clusterState);
155+
String resultsWriteAlias = AnomalyDetectorsIndex.resultsWriteAlias(jobId);
156+
List<String> unavailableIndices = verifyIndicesPrimaryShardsAreActive(resultsWriteAlias, clusterState);
158157
if (unavailableIndices.size() != 0) {
159158
String reason = "Not opening job [" + jobId + "], because not all primary shards are active for the following indices [" +
160159
String.join(",", unavailableIndices) + "]";
@@ -393,9 +392,10 @@ static String[] indicesOfInterest(String resultsIndex) {
393392
return new String[]{AnomalyDetectorsIndex.jobStateIndexPattern(), resultsIndex, MlMetaIndex.INDEX_NAME};
394393
}
395394

396-
static List<String> verifyIndicesPrimaryShardsAreActive(String resultsIndex, ClusterState clusterState) {
395+
static List<String> verifyIndicesPrimaryShardsAreActive(String resultsWriteIndex, ClusterState clusterState) {
397396
IndexNameExpressionResolver resolver = new IndexNameExpressionResolver(Settings.EMPTY);
398-
String[] indices = resolver.concreteIndexNames(clusterState, IndicesOptions.lenientExpandOpen(), indicesOfInterest(resultsIndex));
397+
String[] indices = resolver.concreteIndexNames(clusterState, IndicesOptions.lenientExpandOpen(),
398+
indicesOfInterest(resultsWriteIndex));
399399
List<String> unavailableIndices = new ArrayList<>(indices.length);
400400
for (String index : indices) {
401401
// Indices are created on demand from templates.

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobResultsProvider.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -166,7 +166,7 @@ public void checkForLeftOverDocuments(Job job, ActionListener<Boolean> listener)
166166
.setQuery(QueryBuilders.idsQuery().addIds(Quantiles.documentId(job.getId()), Quantiles.v54DocumentId(job.getId())))
167167
.setIndicesOptions(IndicesOptions.strictExpand());
168168

169-
String resultsIndexName = job.getResultsIndexName();
169+
String resultsIndexName = job.getInitialResultsIndexName();
170170
SearchRequestBuilder resultDocSearch = client.prepareSearch(resultsIndexName)
171171
.setIndicesOptions(IndicesOptions.lenientExpandOpen())
172172
.setQuery(QueryBuilders.termQuery(Job.ID.getPreferredName(), job.getId()))
@@ -252,7 +252,7 @@ public void createJobResultIndex(Job job, ClusterState state, final ActionListen
252252

253253
String readAliasName = AnomalyDetectorsIndex.jobResultsAliasedName(job.getId());
254254
String writeAliasName = AnomalyDetectorsIndex.resultsWriteAlias(job.getId());
255-
String indexName = job.getResultsIndexName();
255+
String indexName = job.getInitialResultsIndexName();
256256

257257
final ActionListener<Boolean> createAliasListener = ActionListener.wrap(success -> {
258258
final IndicesAliasesRequest request = client.admin().indices().prepareAliases()

x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/action/TransportOpenJobActionTests.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -343,7 +343,7 @@ public void testSelectLeastLoadedMlNode_noCompatibleJobTypeNodes() {
343343
when(job.getId()).thenReturn("incompatible_type_job");
344344
when(job.getJobVersion()).thenReturn(Version.CURRENT);
345345
when(job.getJobType()).thenReturn("incompatible_type");
346-
when(job.getResultsIndexName()).thenReturn("shared");
346+
when(job.getInitialResultsIndexName()).thenReturn("shared");
347347

348348
cs.nodes(nodes);
349349
metaData.putCustom(PersistentTasksCustomMetaData.TYPE, tasks);

0 commit comments

Comments
 (0)