Skip to content

Commit 7ae57d6

Browse files
authored
[ML] Fix ML memory tracker lockup when inner step fails (#44158)
When the ML memory tracker is refreshed and a refresh is already in progress the idea is that the second and subsequent refresh requests receive the same response as the currently in progress refresh. There was a bug that if a refresh failed then the ML memory tracker's view of whether a refresh was in progress was not reset, leading to every subsequent request being registered to receive a response that would never come. This change makes the ML memory tracker pass on failures as well as successes to all interested parties and reset the list of interested parties so that further refresh attempts are possible after either a success or failure. This fixes problem 1 of #44156
1 parent 6bd1853 commit 7ae57d6

File tree

2 files changed

+75
-1
lines changed

2 files changed

+75
-1
lines changed

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/process/MlMemoryTracker.java

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -299,7 +299,18 @@ void refresh(PersistentTasksCustomMetaData persistentTasks, ActionListener<Void>
299299
}
300300
fullRefreshCompletionListeners.clear();
301301
}
302-
}, onCompletion::onFailure);
302+
},
303+
e -> {
304+
synchronized (fullRefreshCompletionListeners) {
305+
assert fullRefreshCompletionListeners.isEmpty() == false;
306+
for (ActionListener<Void> listener : fullRefreshCompletionListeners) {
307+
listener.onFailure(e);
308+
}
309+
// It's critical that we empty out the current listener list on
310+
// error otherwise subsequent retries to refresh will be ignored
311+
fullRefreshCompletionListeners.clear();
312+
}
313+
});
303314

304315
// persistentTasks will be null if there's never been a persistent task created in this cluster
305316
if (persistentTasks == null) {

x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/process/MlMemoryTrackerTests.java

Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
import org.elasticsearch.xpack.core.ml.MlTasks;
2020
import org.elasticsearch.xpack.core.ml.action.OpenJobAction;
2121
import org.elasticsearch.xpack.core.ml.action.StartDataFrameAnalyticsAction;
22+
import org.elasticsearch.xpack.core.ml.dataframe.DataFrameAnalyticsConfig;
2223
import org.elasticsearch.xpack.core.ml.job.config.AnalysisLimits;
2324
import org.elasticsearch.xpack.core.ml.job.config.Job;
2425
import org.elasticsearch.xpack.ml.dataframe.persistence.DataFrameAnalyticsConfigProvider;
@@ -32,11 +33,13 @@
3233
import java.util.List;
3334
import java.util.Map;
3435
import java.util.concurrent.ExecutorService;
36+
import java.util.concurrent.atomic.AtomicBoolean;
3537
import java.util.concurrent.atomic.AtomicReference;
3638
import java.util.function.Consumer;
3739

3840
import static org.hamcrest.CoreMatchers.instanceOf;
3941
import static org.mockito.Matchers.any;
42+
import static org.mockito.Matchers.anyBoolean;
4043
import static org.mockito.Matchers.eq;
4144
import static org.mockito.Mockito.anyString;
4245
import static org.mockito.Mockito.doAnswer;
@@ -125,6 +128,66 @@ public void testRefreshAll() {
125128
}
126129
}
127130

131+
public void testRefreshAllFailure() {
132+
133+
Map<String, PersistentTasksCustomMetaData.PersistentTask<?>> tasks = new HashMap<>();
134+
135+
int numAnomalyDetectorJobTasks = randomIntBetween(2, 5);
136+
for (int i = 1; i <= numAnomalyDetectorJobTasks; ++i) {
137+
String jobId = "job" + i;
138+
PersistentTasksCustomMetaData.PersistentTask<?> task = makeTestAnomalyDetectorTask(jobId);
139+
tasks.put(task.getId(), task);
140+
}
141+
142+
int numDataFrameAnalyticsTasks = randomIntBetween(2, 5);
143+
for (int i = 1; i <= numDataFrameAnalyticsTasks; ++i) {
144+
String id = "analytics" + i;
145+
PersistentTasksCustomMetaData.PersistentTask<?> task = makeTestDataFrameAnalyticsTask(id);
146+
tasks.put(task.getId(), task);
147+
}
148+
149+
PersistentTasksCustomMetaData persistentTasks =
150+
new PersistentTasksCustomMetaData(numAnomalyDetectorJobTasks + numDataFrameAnalyticsTasks, tasks);
151+
152+
doAnswer(invocation -> {
153+
@SuppressWarnings("unchecked")
154+
Consumer<Long> listener = (Consumer<Long>) invocation.getArguments()[3];
155+
listener.accept(randomLongBetween(1000, 1000000));
156+
return null;
157+
}).when(jobResultsProvider).getEstablishedMemoryUsage(anyString(), any(), any(), any(Consumer.class), any());
158+
159+
// First run a refresh using a component that calls the onFailure method of the listener
160+
161+
doAnswer(invocation -> {
162+
@SuppressWarnings("unchecked")
163+
ActionListener<List<DataFrameAnalyticsConfig>> listener =
164+
(ActionListener<List<DataFrameAnalyticsConfig>>) invocation.getArguments()[2];
165+
listener.onFailure(new IllegalArgumentException("computer says no"));
166+
return null;
167+
}).when(configProvider).getMultiple(anyString(), anyBoolean(), any(ActionListener.class));
168+
169+
AtomicBoolean gotErrorResponse = new AtomicBoolean(false);
170+
memoryTracker.refresh(persistentTasks,
171+
ActionListener.wrap(aVoid -> fail("Expected error response"), e -> gotErrorResponse.set(true)));
172+
assertTrue(gotErrorResponse.get());
173+
174+
// Now run another refresh using a component that calls the onResponse method of the listener - this
175+
// proves that the ML memory tracker has not been permanently blocked up by the previous failure
176+
177+
doAnswer(invocation -> {
178+
@SuppressWarnings("unchecked")
179+
ActionListener<List<DataFrameAnalyticsConfig>> listener =
180+
(ActionListener<List<DataFrameAnalyticsConfig>>) invocation.getArguments()[2];
181+
listener.onResponse(Collections.emptyList());
182+
return null;
183+
}).when(configProvider).getMultiple(anyString(), anyBoolean(), any(ActionListener.class));
184+
185+
AtomicBoolean gotSuccessResponse = new AtomicBoolean(false);
186+
memoryTracker.refresh(persistentTasks,
187+
ActionListener.wrap(aVoid -> gotSuccessResponse.set(true), e -> fail("Expected success response")));
188+
assertTrue(gotSuccessResponse.get());
189+
}
190+
128191
public void testRefreshOneAnomalyDetectorJob() {
129192

130193
boolean isMaster = randomBoolean();

0 commit comments

Comments
 (0)