Skip to content

Commit 18a65c5

Browse files
authored
DFA Get Stats can return multiple responses if more than one error occurs (#60950)
If the search for get stats with multiple job Ids fails the listener is called for each failure. This change waits for all responses then returns the first error if there was one.
1 parent a5ef38c commit 18a65c5

File tree

2 files changed

+31
-3
lines changed

2 files changed

+31
-3
lines changed

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportGetDataFrameAnalyticsStatsAction.java

+13-1
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,7 @@
6363
import java.util.List;
6464
import java.util.Set;
6565
import java.util.concurrent.atomic.AtomicInteger;
66+
import java.util.concurrent.atomic.AtomicReference;
6667
import java.util.stream.Collectors;
6768

6869
import static org.elasticsearch.xpack.core.ClientHelper.ML_ORIGIN;
@@ -167,21 +168,32 @@ void gatherStatsForStoppedTasks(List<DataFrameAnalyticsConfig> configs, GetDataF
167168

168169
AtomicInteger counter = new AtomicInteger(stoppedConfigs.size());
169170
AtomicArray<Stats> jobStats = new AtomicArray<>(stoppedConfigs.size());
171+
AtomicReference<Exception> searchException = new AtomicReference<>();
170172
for (int i = 0; i < stoppedConfigs.size(); i++) {
171173
final int slot = i;
172174
DataFrameAnalyticsConfig config = stoppedConfigs.get(i);
173175
searchStats(config, ActionListener.wrap(
174176
stats -> {
175177
jobStats.set(slot, stats);
176178
if (counter.decrementAndGet() == 0) {
179+
if (searchException.get() != null) {
180+
listener.onFailure(searchException.get());
181+
return;
182+
}
177183
List<Stats> allTasksStats = new ArrayList<>(runningTasksResponse.getResponse().results());
178184
allTasksStats.addAll(jobStats.asList());
179185
Collections.sort(allTasksStats, Comparator.comparing(Stats::getId));
180186
listener.onResponse(new GetDataFrameAnalyticsStatsAction.Response(new QueryPage<>(
181187
allTasksStats, allTasksStats.size(), GetDataFrameAnalyticsAction.Response.RESULTS_FIELD)));
182188
}
183189
},
184-
listener::onFailure)
190+
e -> {
191+
// take the first error
192+
searchException.compareAndSet(null, e);
193+
if (counter.decrementAndGet() == 0) {
194+
listener.onFailure(e);
195+
}
196+
})
185197
);
186198
}
187199
}

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportGetJobsStatsAction.java

+18-2
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@
4646
import java.util.Optional;
4747
import java.util.Set;
4848
import java.util.concurrent.atomic.AtomicInteger;
49+
import java.util.concurrent.atomic.AtomicReference;
4950
import java.util.function.Consumer;
5051
import java.util.stream.Collectors;
5152

@@ -143,7 +144,17 @@ void gatherStatsForClosedJobs(GetJobsStatsAction.Request request, GetJobsStatsAc
143144
}
144145

145146
AtomicInteger counter = new AtomicInteger(closedJobIds.size());
147+
AtomicReference<Exception> searchException = new AtomicReference<>();
146148
AtomicArray<GetJobsStatsAction.Response.JobStats> jobStats = new AtomicArray<>(closedJobIds.size());
149+
150+
Consumer<Exception> errorHandler = e -> {
151+
// take the first error
152+
searchException.compareAndSet(null, e);
153+
if (counter.decrementAndGet() == 0) {
154+
listener.onFailure(e);
155+
}
156+
};
157+
147158
PersistentTasksCustomMetadata tasks = clusterService.state().getMetadata().custom(PersistentTasksCustomMetadata.TYPE);
148159
for (int i = 0; i < closedJobIds.size(); i++) {
149160
int slot = i;
@@ -159,14 +170,19 @@ void gatherStatsForClosedJobs(GetJobsStatsAction.Request request, GetJobsStatsAc
159170
jobStats.set(slot, new JobStats(jobId, dataCounts, modelSizeStats, forecastStats, jobState,
160171
null, assignmentExplanation, null, timingStats));
161172
if (counter.decrementAndGet() == 0) {
173+
if (searchException.get() != null) {
174+
// there was an error
175+
listener.onFailure(searchException.get());
176+
return;
177+
}
162178
List<JobStats> results = response.getResponse().results();
163179
results.addAll(jobStats.asList());
164180
Collections.sort(results, Comparator.comparing(GetJobsStatsAction.Response.JobStats::getJobId));
165181
listener.onResponse(new GetJobsStatsAction.Response(response.getTaskFailures(), response.getNodeFailures(),
166182
new QueryPage<>(results, results.size(), Job.RESULTS_FIELD)));
167183
}
168-
}, listener::onFailure);
169-
}, listener::onFailure);
184+
}, errorHandler);
185+
}, errorHandler);
170186
}
171187
}
172188

0 commit comments

Comments
 (0)