Skip to content

Commit 0804b8c

Browse files
authored
[ML] Wait for shards to initialize after creating ML internal indices (#59027)
There have been a few test failures that are likely caused by tests performing actions that use ML indices immediately after the actions that create those ML indices. Currently this can result in attempts to search the newly created index before its shards have initialized. This change makes the method that creates the internal ML indices that have been affected by this problem (state and stats) wait for the shards to be initialized before returning. Fixes #54887 Fixes #55221 Fixes #55807 Fixes #57102 Fixes #58841 Fixes #59011
1 parent 2f389a7 commit 0804b8c

File tree

4 files changed

+53
-9
lines changed

4 files changed

+53
-9
lines changed

x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/utils/MlIndexAndAlias.java

+40-7
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,8 @@
99
import org.apache.logging.log4j.Logger;
1010
import org.elasticsearch.ResourceAlreadyExistsException;
1111
import org.elasticsearch.action.ActionListener;
12+
import org.elasticsearch.action.admin.cluster.health.ClusterHealthRequest;
13+
import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse;
1214
import org.elasticsearch.action.admin.indices.alias.Alias;
1315
import org.elasticsearch.action.admin.indices.alias.IndicesAliasesRequest;
1416
import org.elasticsearch.action.admin.indices.alias.IndicesAliasesRequestBuilder;
@@ -18,6 +20,7 @@
1820
import org.elasticsearch.action.support.IndicesOptions;
1921
import org.elasticsearch.action.support.master.AcknowledgedResponse;
2022
import org.elasticsearch.client.Client;
23+
import org.elasticsearch.client.Requests;
2124
import org.elasticsearch.cluster.ClusterState;
2225
import org.elasticsearch.cluster.metadata.IndexMetadata;
2326
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
@@ -69,13 +72,27 @@ private MlIndexAndAlias() {}
6972
* Adds an {@code alias} to that index if it was created,
7073
* or to the index with the highest suffix if the index did not have to be created.
7174
* The listener is notified with a {@code boolean} that informs whether the index or the alias were created.
75+
* If the index is created, the listener is not called until the index is ready to use via the supplied alias,
76+
* so that a method that receives a success response from this method can safely use the index immediately.
7277
*/
7378
public static void createIndexAndAliasIfNecessary(Client client,
7479
ClusterState clusterState,
7580
IndexNameExpressionResolver resolver,
7681
String indexPatternPrefix,
7782
String alias,
78-
ActionListener<Boolean> listener) {
83+
ActionListener<Boolean> finalListener) {
84+
85+
// If both the index and alias were successfully created then wait for the shards of the index that the alias points to be ready
86+
ActionListener<Boolean> indexCreatedListener = ActionListener.wrap(
87+
created -> {
88+
if (created) {
89+
waitForShardsReady(client, alias, finalListener);
90+
} else {
91+
finalListener.onResponse(false);
92+
}
93+
},
94+
finalListener::onFailure
95+
);
7996

8097
String legacyIndexWithoutSuffix = indexPatternPrefix;
8198
String indexPattern = indexPatternPrefix + "*";
@@ -89,15 +106,15 @@ public static void createIndexAndAliasIfNecessary(Client client,
89106

90107
if (concreteIndexNames.length == 0) {
91108
if (indexPointedByCurrentWriteAlias.isEmpty()) {
92-
createFirstConcreteIndex(client, firstConcreteIndex, alias, true, listener);
109+
createFirstConcreteIndex(client, firstConcreteIndex, alias, true, indexCreatedListener);
93110
return;
94111
}
95112
logger.error(
96113
"There are no indices matching '{}' pattern but '{}' alias points at [{}]. This should never happen.",
97114
indexPattern, alias, indexPointedByCurrentWriteAlias.get());
98115
} else if (concreteIndexNames.length == 1 && concreteIndexNames[0].equals(legacyIndexWithoutSuffix)) {
99116
if (indexPointedByCurrentWriteAlias.isEmpty()) {
100-
createFirstConcreteIndex(client, firstConcreteIndex, alias, true, listener);
117+
createFirstConcreteIndex(client, firstConcreteIndex, alias, true, indexCreatedListener);
101118
return;
102119
}
103120
if (indexPointedByCurrentWriteAlias.get().getIndex().getName().equals(legacyIndexWithoutSuffix)) {
@@ -107,8 +124,8 @@ public static void createIndexAndAliasIfNecessary(Client client,
107124
alias,
108125
false,
109126
ActionListener.wrap(
110-
unused -> updateWriteAlias(client, alias, legacyIndexWithoutSuffix, firstConcreteIndex, listener),
111-
listener::onFailure)
127+
unused -> updateWriteAlias(client, alias, legacyIndexWithoutSuffix, firstConcreteIndex, indexCreatedListener),
128+
finalListener::onFailure)
112129
);
113130
return;
114131
}
@@ -119,12 +136,28 @@ public static void createIndexAndAliasIfNecessary(Client client,
119136
if (indexPointedByCurrentWriteAlias.isEmpty()) {
120137
assert concreteIndexNames.length > 0;
121138
String latestConcreteIndexName = Arrays.stream(concreteIndexNames).max(INDEX_NAME_COMPARATOR).get();
122-
updateWriteAlias(client, alias, null, latestConcreteIndexName, listener);
139+
updateWriteAlias(client, alias, null, latestConcreteIndexName, finalListener);
123140
return;
124141
}
125142
}
126143
// If the alias is set, there is nothing more to do.
127-
listener.onResponse(false);
144+
finalListener.onResponse(false);
145+
}
146+
147+
private static void waitForShardsReady(Client client, String index, ActionListener<Boolean> listener) {
148+
ClusterHealthRequest healthRequest = Requests.clusterHealthRequest(index)
149+
.waitForYellowStatus()
150+
.waitForNoRelocatingShards(true)
151+
.waitForNoInitializingShards(true);
152+
executeAsyncWithOrigin(
153+
client.threadPool().getThreadContext(),
154+
ML_ORIGIN,
155+
healthRequest,
156+
ActionListener.<ClusterHealthResponse>wrap(
157+
response -> listener.onResponse(response.isTimedOut() == false),
158+
listener::onFailure),
159+
client.admin().cluster()::health
160+
);
128161
}
129162

130163
private static void createFirstConcreteIndex(Client client,

x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/utils/MlIndexAndAliasTests.java

+13
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,8 @@
77

88
import org.elasticsearch.Version;
99
import org.elasticsearch.action.ActionListener;
10+
import org.elasticsearch.action.admin.cluster.health.ClusterHealthRequest;
11+
import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse;
1012
import org.elasticsearch.action.admin.indices.alias.Alias;
1113
import org.elasticsearch.action.admin.indices.alias.IndicesAliasesAction;
1214
import org.elasticsearch.action.admin.indices.alias.IndicesAliasesRequest;
@@ -19,6 +21,7 @@
1921
import org.elasticsearch.action.support.master.AcknowledgedResponse;
2022
import org.elasticsearch.client.AdminClient;
2123
import org.elasticsearch.client.Client;
24+
import org.elasticsearch.client.ClusterAdminClient;
2225
import org.elasticsearch.client.IndicesAdminClient;
2326
import org.elasticsearch.cluster.ClusterName;
2427
import org.elasticsearch.cluster.ClusterState;
@@ -66,6 +69,7 @@ public class MlIndexAndAliasTests extends ESTestCase {
6669

6770
private ThreadPool threadPool;
6871
private IndicesAdminClient indicesAdminClient;
72+
private ClusterAdminClient clusterAdminClient;
6973
private AdminClient adminClient;
7074
private Client client;
7175
private ActionListener<Boolean> listener;
@@ -85,8 +89,17 @@ public void setUpMocks() {
8589
when(indicesAdminClient.prepareAliases()).thenReturn(new IndicesAliasesRequestBuilder(client, IndicesAliasesAction.INSTANCE));
8690
doAnswer(withResponse(new AcknowledgedResponse(true))).when(indicesAdminClient).aliases(any(), any());
8791

92+
clusterAdminClient = mock(ClusterAdminClient.class);
93+
doAnswer(invocationOnMock -> {
94+
@SuppressWarnings("unchecked")
95+
ActionListener<ClusterHealthResponse> listener = (ActionListener<ClusterHealthResponse>) invocationOnMock.getArguments()[1];
96+
listener.onResponse(new ClusterHealthResponse());
97+
return null;
98+
}).when(clusterAdminClient).health(any(ClusterHealthRequest.class), any(ActionListener.class));
99+
88100
adminClient = mock(AdminClient.class);
89101
when(adminClient.indices()).thenReturn(indicesAdminClient);
102+
when(adminClient.cluster()).thenReturn(clusterAdminClient);
90103

91104
client = mock(Client.class);
92105
when(client.threadPool()).thenReturn(threadPool);

x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/DeleteExpiredDataIT.java

-1
Original file line numberDiff line numberDiff line change
@@ -137,7 +137,6 @@ public void testDeleteExpiredDataActionDeletesEmptyStateIndices() throws Excepti
137137
is(arrayContaining(".ml-state-000001", ".ml-state-000005", ".ml-state-000007")));
138138
}
139139

140-
@AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/57102")
141140
public void testDeleteExpiredDataWithStandardThrottle() throws Exception {
142141
testExpiredDeletion(-1.0f, 100);
143142
}

x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/RegressionIT.java

-1
Original file line numberDiff line numberDiff line change
@@ -297,7 +297,6 @@ public void testStopAndRestart() throws Exception {
297297
assertMlResultsFieldMappings(destIndex, predictedClassField, "double");
298298
}
299299

300-
@AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/55807")
301300
public void testTwoJobsWithSameRandomizeSeedUseSameTrainingSet() throws Exception {
302301
String sourceIndex = "regression_two_jobs_with_same_randomize_seed_source";
303302
indexData(sourceIndex, 100, 0);

0 commit comments

Comments
 (0)