Skip to content

Commit 3e4b0aa

Browse files
committed
[ML] Avoid memory tracker race condition (#69290)
This change fixes a race condition that can occur if the return value of memoryTracker.isRecentlyRefreshed() changes between two calls that are assumed to return the same value. The solution is to just call the method once and pass that value to the other place where it is needed. Then all related code makes decisions based on the same view of whether the memory tracker has been recently refreshed or not. Fixes #69289
1 parent 0a1d422 commit 3e4b0aa

File tree

4 files changed

+9
-7
lines changed

4 files changed

+9
-7
lines changed

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportStartDataFrameAnalyticsAction.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -609,7 +609,8 @@ protected AllocatedPersistentTask createTask(
609609
@Override
610610
public PersistentTasksCustomMetadata.Assignment getAssignment(TaskParams params, ClusterState clusterState) {
611611
boolean isMemoryTrackerRecentlyRefreshed = memoryTracker.isRecentlyRefreshed();
612-
Optional<PersistentTasksCustomMetadata.Assignment> optionalAssignment = getPotentialAssignment(params, clusterState);
612+
Optional<PersistentTasksCustomMetadata.Assignment> optionalAssignment =
613+
getPotentialAssignment(params, clusterState, isMemoryTrackerRecentlyRefreshed);
613614
if (optionalAssignment.isPresent()) {
614615
return optionalAssignment.get();
615616
}

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/snapshot/upgrader/SnapshotUpgradeTaskExecutor.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -79,7 +79,8 @@ public SnapshotUpgradeTaskExecutor(Settings settings,
7979
@Override
8080
public PersistentTasksCustomMetadata.Assignment getAssignment(SnapshotUpgradeTaskParams params, ClusterState clusterState) {
8181
boolean isMemoryTrackerRecentlyRefreshed = memoryTracker.isRecentlyRefreshed();
82-
Optional<PersistentTasksCustomMetadata.Assignment> optionalAssignment = getPotentialAssignment(params, clusterState);
82+
Optional<PersistentTasksCustomMetadata.Assignment> optionalAssignment =
83+
getPotentialAssignment(params, clusterState, isMemoryTrackerRecentlyRefreshed);
8384
if (optionalAssignment.isPresent()) {
8485
return optionalAssignment.get();
8586
}

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/task/OpenJobPersistentTasksExecutor.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -114,7 +114,7 @@ public Assignment getAssignment(OpenJobAction.JobParams params, ClusterState clu
114114
return AWAITING_MIGRATION;
115115
}
116116
boolean isMemoryTrackerRecentlyRefreshed = memoryTracker.isRecentlyRefreshed();
117-
Optional<Assignment> optionalAssignment = getPotentialAssignment(params, clusterState);
117+
Optional<Assignment> optionalAssignment = getPotentialAssignment(params, clusterState, isMemoryTrackerRecentlyRefreshed);
118118
if (optionalAssignment.isPresent()) {
119119
return optionalAssignment.get();
120120
}

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/task/AbstractJobPersistentTasksExecutor.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -152,7 +152,8 @@ protected boolean allowsMissingIndices() {
152152
return true;
153153
}
154154

155-
public Optional<PersistentTasksCustomMetadata.Assignment> getPotentialAssignment(Params params, ClusterState clusterState) {
155+
public Optional<PersistentTasksCustomMetadata.Assignment> getPotentialAssignment(Params params, ClusterState clusterState,
156+
boolean isMemoryTrackerRecentlyRefreshed) {
156157
// If we are waiting for an upgrade to complete, we should not assign to a node
157158
if (MlMetadata.getMlMetadata(clusterState).isUpgradeMode()) {
158159
return Optional.of(AWAITING_UPGRADE);
@@ -165,7 +166,7 @@ public Optional<PersistentTasksCustomMetadata.Assignment> getPotentialAssignment
165166
if (missingIndices.isPresent()) {
166167
return missingIndices;
167168
}
168-
Optional<PersistentTasksCustomMetadata.Assignment> staleMemory = checkMemoryFreshness(jobId);
169+
Optional<PersistentTasksCustomMetadata.Assignment> staleMemory = checkMemoryFreshness(jobId, isMemoryTrackerRecentlyRefreshed);
169170
if (staleMemory.isPresent()) {
170171
return staleMemory;
171172
}
@@ -212,8 +213,7 @@ public Optional<PersistentTasksCustomMetadata.Assignment> checkRequiredIndices(S
212213
return Optional.empty();
213214
}
214215

215-
public Optional<PersistentTasksCustomMetadata.Assignment> checkMemoryFreshness(String jobId) {
216-
boolean isMemoryTrackerRecentlyRefreshed = memoryTracker.isRecentlyRefreshed();
216+
public Optional<PersistentTasksCustomMetadata.Assignment> checkMemoryFreshness(String jobId, boolean isMemoryTrackerRecentlyRefreshed) {
217217
if (isMemoryTrackerRecentlyRefreshed == false) {
218218
boolean scheduledRefresh = memoryTracker.asyncRefresh();
219219
if (scheduledRefresh) {

0 commit comments

Comments
 (0)