Skip to content

Commit 3c01f1c

Browse files
[ML] Fix master node deadlock during ML daily maintenance (#31836)
This is the implementation for master and 6.x of #31691. Native tests are changed to use multi-node clusters in #31757. Relates #31683
1 parent 8594017 commit 3c01f1c

File tree

4 files changed

+99
-25
lines changed

4 files changed

+99
-25
lines changed

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportDeleteExpiredDataAction.java

Lines changed: 12 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
import org.elasticsearch.action.ActionListener;
99
import org.elasticsearch.action.support.ActionFilters;
1010
import org.elasticsearch.action.support.HandledTransportAction;
11+
import org.elasticsearch.action.support.ThreadedActionListener;
1112
import org.elasticsearch.client.Client;
1213
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
1314
import org.elasticsearch.cluster.service.ClusterService;
@@ -56,8 +57,8 @@ private void deleteExpiredData(ActionListener<DeleteExpiredDataAction.Response>
5657
Auditor auditor = new Auditor(client, clusterService.nodeName());
5758
List<MlDataRemover> dataRemovers = Arrays.asList(
5859
new ExpiredResultsRemover(client, clusterService, auditor),
59-
new ExpiredForecastsRemover(client),
60-
new ExpiredModelSnapshotsRemover(client, clusterService),
60+
new ExpiredForecastsRemover(client, threadPool),
61+
new ExpiredModelSnapshotsRemover(client, threadPool, clusterService),
6162
new UnusedStateRemover(client, clusterService)
6263
);
6364
Iterator<MlDataRemover> dataRemoversIterator = new VolatileCursorIterator<>(dataRemovers);
@@ -68,9 +69,15 @@ private void deleteExpiredData(Iterator<MlDataRemover> mlDataRemoversIterator,
6869
ActionListener<DeleteExpiredDataAction.Response> listener) {
6970
if (mlDataRemoversIterator.hasNext()) {
7071
MlDataRemover remover = mlDataRemoversIterator.next();
71-
remover.remove(ActionListener.wrap(
72-
booleanResponse -> deleteExpiredData(mlDataRemoversIterator, listener),
73-
listener::onFailure));
72+
ActionListener<Boolean> nextListener = ActionListener.wrap(
73+
booleanResponse -> deleteExpiredData(mlDataRemoversIterator, listener), listener::onFailure);
74+
// Removing expired ML data and artifacts requires multiple operations.
75+
// These are queued up and executed sequentially in the action listener,
76+
// the chained calls must all run the ML utility thread pool NOT the thread
77+
// the previous action returned in which in the case of a transport_client_boss
78+
// thread is a disaster.
79+
remover.remove(new ThreadedActionListener<>(logger, threadPool, MachineLearning.UTILITY_THREAD_POOL_NAME, nextListener,
80+
false));
7481
} else {
7582
logger.info("Completed deletion of expired data");
7683
listener.onResponse(new DeleteExpiredDataAction.Response(true));

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/retention/ExpiredForecastsRemover.java

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
import org.elasticsearch.action.search.SearchAction;
1212
import org.elasticsearch.action.search.SearchRequest;
1313
import org.elasticsearch.action.search.SearchResponse;
14+
import org.elasticsearch.action.support.ThreadedActionListener;
1415
import org.elasticsearch.client.Client;
1516
import org.elasticsearch.common.logging.Loggers;
1617
import org.elasticsearch.common.xcontent.LoggingDeprecationHandler;
@@ -27,11 +28,13 @@
2728
import org.elasticsearch.search.SearchHit;
2829
import org.elasticsearch.search.SearchHits;
2930
import org.elasticsearch.search.builder.SearchSourceBuilder;
31+
import org.elasticsearch.threadpool.ThreadPool;
3032
import org.elasticsearch.xpack.core.ml.job.config.Job;
3133
import org.elasticsearch.xpack.core.ml.job.persistence.AnomalyDetectorsIndex;
3234
import org.elasticsearch.xpack.core.ml.job.results.Forecast;
3335
import org.elasticsearch.xpack.core.ml.job.results.ForecastRequestStats;
3436
import org.elasticsearch.xpack.core.ml.job.results.Result;
37+
import org.elasticsearch.xpack.ml.MachineLearning;
3538
import org.joda.time.DateTime;
3639
import org.joda.time.chrono.ISOChronology;
3740

@@ -57,10 +60,12 @@ public class ExpiredForecastsRemover implements MlDataRemover {
5760
private static final String RESULTS_INDEX_PATTERN = AnomalyDetectorsIndex.jobResultsIndexPrefix() + "*";
5861

5962
private final Client client;
63+
private final ThreadPool threadPool;
6064
private final long cutoffEpochMs;
6165

62-
public ExpiredForecastsRemover(Client client) {
66+
public ExpiredForecastsRemover(Client client, ThreadPool threadPool) {
6367
this.client = Objects.requireNonNull(client);
68+
this.threadPool = Objects.requireNonNull(threadPool);
6469
this.cutoffEpochMs = DateTime.now(ISOChronology.getInstance()).getMillis();
6570
}
6671

@@ -79,7 +84,8 @@ public void remove(ActionListener<Boolean> listener) {
7984

8085
SearchRequest searchRequest = new SearchRequest(RESULTS_INDEX_PATTERN);
8186
searchRequest.source(source);
82-
client.execute(SearchAction.INSTANCE, searchRequest, forecastStatsHandler);
87+
client.execute(SearchAction.INSTANCE, searchRequest, new ThreadedActionListener<>(LOGGER, threadPool,
88+
MachineLearning.UTILITY_THREAD_POOL_NAME, forecastStatsHandler, false));
8389
}
8490

8591
private void deleteForecasts(SearchResponse searchResponse, ActionListener<Boolean> listener) {

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/retention/ExpiredModelSnapshotsRemover.java

Lines changed: 14 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -11,18 +11,21 @@
1111
import org.elasticsearch.action.search.SearchAction;
1212
import org.elasticsearch.action.search.SearchRequest;
1313
import org.elasticsearch.action.search.SearchResponse;
14+
import org.elasticsearch.action.support.ThreadedActionListener;
1415
import org.elasticsearch.client.Client;
1516
import org.elasticsearch.cluster.service.ClusterService;
1617
import org.elasticsearch.common.logging.Loggers;
1718
import org.elasticsearch.index.query.QueryBuilder;
1819
import org.elasticsearch.index.query.QueryBuilders;
1920
import org.elasticsearch.search.SearchHit;
2021
import org.elasticsearch.search.builder.SearchSourceBuilder;
22+
import org.elasticsearch.threadpool.ThreadPool;
2123
import org.elasticsearch.xpack.core.ml.action.DeleteModelSnapshotAction;
2224
import org.elasticsearch.xpack.core.ml.job.config.Job;
2325
import org.elasticsearch.xpack.core.ml.job.persistence.AnomalyDetectorsIndex;
2426
import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.ModelSnapshot;
2527
import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.ModelSnapshotField;
28+
import org.elasticsearch.xpack.ml.MachineLearning;
2629

2730
import java.util.ArrayList;
2831
import java.util.Iterator;
@@ -51,10 +54,12 @@ public class ExpiredModelSnapshotsRemover extends AbstractExpiredJobDataRemover
5154
private static final int MODEL_SNAPSHOT_SEARCH_SIZE = 10000;
5255

5356
private final Client client;
57+
private final ThreadPool threadPool;
5458

55-
public ExpiredModelSnapshotsRemover(Client client, ClusterService clusterService) {
59+
public ExpiredModelSnapshotsRemover(Client client, ThreadPool threadPool, ClusterService clusterService) {
5660
super(clusterService);
5761
this.client = Objects.requireNonNull(client);
62+
this.threadPool = Objects.requireNonNull(threadPool);
5863
}
5964

6065
@Override
@@ -84,7 +89,12 @@ protected void removeDataBefore(Job job, long cutoffEpochMs, ActionListener<Bool
8489

8590
searchRequest.source(new SearchSourceBuilder().query(query).size(MODEL_SNAPSHOT_SEARCH_SIZE));
8691

87-
client.execute(SearchAction.INSTANCE, searchRequest, new ActionListener<SearchResponse>() {
92+
client.execute(SearchAction.INSTANCE, searchRequest, new ThreadedActionListener<>(LOGGER, threadPool,
93+
MachineLearning.UTILITY_THREAD_POOL_NAME, expiredSnapshotsListener(job.getId(), listener), false));
94+
}
95+
96+
private ActionListener<SearchResponse> expiredSnapshotsListener(String jobId, ActionListener<Boolean> listener) {
97+
return new ActionListener<SearchResponse>() {
8898
@Override
8999
public void onResponse(SearchResponse searchResponse) {
90100
try {
@@ -100,9 +110,9 @@ public void onResponse(SearchResponse searchResponse) {
100110

101111
@Override
102112
public void onFailure(Exception e) {
103-
listener.onFailure(new ElasticsearchException("[" + job.getId() + "] Search for expired snapshots failed", e));
113+
listener.onFailure(new ElasticsearchException("[" + jobId + "] Search for expired snapshots failed", e));
104114
}
105-
});
115+
};
106116
}
107117

108118
private void deleteModelSnapshots(Iterator<ModelSnapshot> modelSnapshotIterator, ActionListener<Boolean> listener) {

x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/retention/ExpiredModelSnapshotsRemoverTests.java

Lines changed: 65 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -14,20 +14,25 @@
1414
import org.elasticsearch.cluster.metadata.MetaData;
1515
import org.elasticsearch.cluster.service.ClusterService;
1616
import org.elasticsearch.common.bytes.BytesReference;
17+
import org.elasticsearch.common.settings.Settings;
1718
import org.elasticsearch.common.xcontent.ToXContent;
1819
import org.elasticsearch.common.xcontent.XContentBuilder;
1920
import org.elasticsearch.common.xcontent.json.JsonXContent;
2021
import org.elasticsearch.mock.orig.Mockito;
2122
import org.elasticsearch.search.SearchHit;
2223
import org.elasticsearch.search.SearchHits;
2324
import org.elasticsearch.test.ESTestCase;
25+
import org.elasticsearch.threadpool.FixedExecutorBuilder;
26+
import org.elasticsearch.threadpool.ThreadPool;
2427
import org.elasticsearch.xpack.core.ml.MLMetadataField;
2528
import org.elasticsearch.xpack.core.ml.MlMetadata;
2629
import org.elasticsearch.xpack.core.ml.action.DeleteModelSnapshotAction;
2730
import org.elasticsearch.xpack.core.ml.job.config.Job;
2831
import org.elasticsearch.xpack.core.ml.job.config.JobTests;
2932
import org.elasticsearch.xpack.core.ml.job.persistence.AnomalyDetectorsIndex;
3033
import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.ModelSnapshot;
34+
import org.elasticsearch.xpack.ml.MachineLearning;
35+
import org.junit.After;
3136
import org.junit.Before;
3237
import org.mockito.invocation.InvocationOnMock;
3338
import org.mockito.stubbing.Answer;
@@ -38,24 +43,27 @@
3843
import java.util.HashMap;
3944
import java.util.List;
4045
import java.util.Map;
46+
import java.util.concurrent.CountDownLatch;
47+
import java.util.concurrent.TimeUnit;
4148

4249
import static org.hamcrest.Matchers.equalTo;
50+
import static org.hamcrest.Matchers.is;
4351
import static org.mockito.Matchers.any;
4452
import static org.mockito.Matchers.same;
4553
import static org.mockito.Mockito.doAnswer;
4654
import static org.mockito.Mockito.mock;
47-
import static org.mockito.Mockito.verify;
4855
import static org.mockito.Mockito.when;
4956

5057
public class ExpiredModelSnapshotsRemoverTests extends ESTestCase {
5158

5259
private Client client;
60+
private ThreadPool threadPool;
5361
private ClusterService clusterService;
5462
private ClusterState clusterState;
5563
private List<SearchRequest> capturedSearchRequests;
5664
private List<DeleteModelSnapshotAction.Request> capturedDeleteModelSnapshotRequests;
5765
private List<SearchResponse> searchResponsesPerCall;
58-
private ActionListener<Boolean> listener;
66+
private TestListener listener;
5967

6068
@Before
6169
public void setUpTests() {
@@ -66,7 +74,19 @@ public void setUpTests() {
6674
clusterState = mock(ClusterState.class);
6775
when(clusterService.state()).thenReturn(clusterState);
6876
client = mock(Client.class);
69-
listener = mock(ActionListener.class);
77+
listener = new TestListener();
78+
79+
// Init thread pool
80+
Settings settings = Settings.builder()
81+
.put("node.name", "expired_model_snapshots_remover_test")
82+
.build();
83+
threadPool = new ThreadPool(settings,
84+
new FixedExecutorBuilder(settings, MachineLearning.UTILITY_THREAD_POOL_NAME, 1, 1000, ""));
85+
}
86+
87+
@After
88+
public void shutdownThreadPool() throws InterruptedException {
89+
terminate(threadPool);
7090
}
7191

7292
public void testRemove_GivenJobsWithoutRetentionPolicy() {
@@ -78,7 +98,8 @@ public void testRemove_GivenJobsWithoutRetentionPolicy() {
7898

7999
createExpiredModelSnapshotsRemover().remove(listener);
80100

81-
verify(listener).onResponse(true);
101+
listener.waitToCompletion();
102+
assertThat(listener.success, is(true));
82103
Mockito.verifyNoMoreInteractions(client);
83104
}
84105

@@ -88,7 +109,8 @@ public void testRemove_GivenJobWithoutActiveSnapshot() {
88109

89110
createExpiredModelSnapshotsRemover().remove(listener);
90111

91-
verify(listener).onResponse(true);
112+
listener.waitToCompletion();
113+
assertThat(listener.success, is(true));
92114
Mockito.verifyNoMoreInteractions(client);
93115
}
94116

@@ -108,6 +130,9 @@ public void testRemove_GivenJobsWithMixedRetentionPolicies() throws IOException
108130

109131
createExpiredModelSnapshotsRemover().remove(listener);
110132

133+
listener.waitToCompletion();
134+
assertThat(listener.success, is(true));
135+
111136
assertThat(capturedSearchRequests.size(), equalTo(2));
112137
SearchRequest searchRequest = capturedSearchRequests.get(0);
113138
assertThat(searchRequest.indices(), equalTo(new String[] {AnomalyDetectorsIndex.jobResultsAliasedName("snapshots-1")}));
@@ -124,8 +149,6 @@ public void testRemove_GivenJobsWithMixedRetentionPolicies() throws IOException
124149
deleteSnapshotRequest = capturedDeleteModelSnapshotRequests.get(2);
125150
assertThat(deleteSnapshotRequest.getJobId(), equalTo("snapshots-2"));
126151
assertThat(deleteSnapshotRequest.getSnapshotId(), equalTo("snapshots-2_1"));
127-
128-
verify(listener).onResponse(true);
129152
}
130153

131154
public void testRemove_GivenClientSearchRequestsFail() throws IOException {
@@ -144,13 +167,14 @@ public void testRemove_GivenClientSearchRequestsFail() throws IOException {
144167

145168
createExpiredModelSnapshotsRemover().remove(listener);
146169

170+
listener.waitToCompletion();
171+
assertThat(listener.success, is(false));
172+
147173
assertThat(capturedSearchRequests.size(), equalTo(1));
148174
SearchRequest searchRequest = capturedSearchRequests.get(0);
149175
assertThat(searchRequest.indices(), equalTo(new String[] {AnomalyDetectorsIndex.jobResultsAliasedName("snapshots-1")}));
150176

151177
assertThat(capturedDeleteModelSnapshotRequests.size(), equalTo(0));
152-
153-
verify(listener).onFailure(any());
154178
}
155179

156180
public void testRemove_GivenClientDeleteSnapshotRequestsFail() throws IOException {
@@ -169,6 +193,9 @@ public void testRemove_GivenClientDeleteSnapshotRequestsFail() throws IOExceptio
169193

170194
createExpiredModelSnapshotsRemover().remove(listener);
171195

196+
listener.waitToCompletion();
197+
assertThat(listener.success, is(false));
198+
172199
assertThat(capturedSearchRequests.size(), equalTo(1));
173200
SearchRequest searchRequest = capturedSearchRequests.get(0);
174201
assertThat(searchRequest.indices(), equalTo(new String[] {AnomalyDetectorsIndex.jobResultsAliasedName("snapshots-1")}));
@@ -177,8 +204,6 @@ public void testRemove_GivenClientDeleteSnapshotRequestsFail() throws IOExceptio
177204
DeleteModelSnapshotAction.Request deleteSnapshotRequest = capturedDeleteModelSnapshotRequests.get(0);
178205
assertThat(deleteSnapshotRequest.getJobId(), equalTo("snapshots-1"));
179206
assertThat(deleteSnapshotRequest.getSnapshotId(), equalTo("snapshots-1_1"));
180-
181-
verify(listener).onFailure(any());
182207
}
183208

184209
private void givenJobs(List<Job> jobs) {
@@ -192,7 +217,7 @@ private void givenJobs(List<Job> jobs) {
192217
}
193218

194219
private ExpiredModelSnapshotsRemover createExpiredModelSnapshotsRemover() {
195-
return new ExpiredModelSnapshotsRemover(client, clusterService);
220+
return new ExpiredModelSnapshotsRemover(client, threadPool, clusterService);
196221
}
197222

198223
private static ModelSnapshot createModelSnapshot(String jobId, String snapshotId) {
@@ -230,7 +255,7 @@ private void givenClientRequests(boolean shouldSearchRequestsSucceed, boolean sh
230255
int callCount = 0;
231256

232257
@Override
233-
public Void answer(InvocationOnMock invocationOnMock) throws Throwable {
258+
public Void answer(InvocationOnMock invocationOnMock) {
234259
SearchRequest searchRequest = (SearchRequest) invocationOnMock.getArguments()[1];
235260
capturedSearchRequests.add(searchRequest);
236261
ActionListener<SearchResponse> listener = (ActionListener<SearchResponse>) invocationOnMock.getArguments()[2];
@@ -244,7 +269,7 @@ public Void answer(InvocationOnMock invocationOnMock) throws Throwable {
244269
}).when(client).execute(same(SearchAction.INSTANCE), any(), any());
245270
doAnswer(new Answer<Void>() {
246271
@Override
247-
public Void answer(InvocationOnMock invocationOnMock) throws Throwable {
272+
public Void answer(InvocationOnMock invocationOnMock) {
248273
capturedDeleteModelSnapshotRequests.add((DeleteModelSnapshotAction.Request) invocationOnMock.getArguments()[1]);
249274
ActionListener<DeleteModelSnapshotAction.Response> listener =
250275
(ActionListener<DeleteModelSnapshotAction.Response>) invocationOnMock.getArguments()[2];
@@ -257,4 +282,30 @@ public Void answer(InvocationOnMock invocationOnMock) throws Throwable {
257282
}
258283
}).when(client).execute(same(DeleteModelSnapshotAction.INSTANCE), any(), any());
259284
}
285+
286+
private class TestListener implements ActionListener<Boolean> {
287+
288+
private boolean success;
289+
private final CountDownLatch latch = new CountDownLatch(1);
290+
291+
@Override
292+
public void onResponse(Boolean aBoolean) {
293+
success = aBoolean;
294+
latch.countDown();
295+
}
296+
297+
@Override
298+
public void onFailure(Exception e) {
299+
latch.countDown();
300+
}
301+
302+
public void waitToCompletion() {
303+
try {
304+
latch.await(10, TimeUnit.SECONDS);
305+
} catch (InterruptedException e) {
306+
fail("listener timed out before completing");
307+
}
308+
}
309+
}
310+
260311
}

0 commit comments

Comments
 (0)