|
9 | 9 | import org.apache.logging.log4j.LogManager;
|
10 | 10 | import org.apache.logging.log4j.Logger;
|
11 | 11 | import org.elasticsearch.action.ActionListener;
|
| 12 | +import org.elasticsearch.action.ActionRunnable; |
12 | 13 | import org.elasticsearch.action.support.ActionFilters;
|
13 | 14 | import org.elasticsearch.action.support.HandledTransportAction;
|
14 | 15 | import org.elasticsearch.action.support.ThreadedActionListener;
|
@@ -137,17 +138,25 @@ protected void doExecute(
|
137 | 138 | if (Strings.isNullOrEmpty(request.getJobId()) || Strings.isAllOrWildcard(request.getJobId())) {
|
138 | 139 | List<MlDataRemover> dataRemovers = createDataRemovers(client, taskId, anomalyDetectionAuditor);
|
139 | 140 | threadPool.executor(MachineLearning.UTILITY_THREAD_POOL_NAME)
|
140 |
| - .execute(() -> deleteExpiredData(request, dataRemovers, listener, isTimedOutSupplier)); |
| 141 | + .execute(ActionRunnable.wrap(listener, l -> deleteExpiredData(request, dataRemovers, l, isTimedOutSupplier))); |
141 | 142 | } else {
|
142 |
| - jobConfigProvider.expandJobs(request.getJobId(), false, true, null, ActionListener.wrap(jobBuilders -> { |
143 |
| - threadPool.executor(MachineLearning.UTILITY_THREAD_POOL_NAME).execute(() -> { |
144 |
| - List<Job> jobs = jobBuilders.stream().map(Job.Builder::build).collect(Collectors.toList()); |
145 |
| - String[] jobIds = jobs.stream().map(Job::getId).toArray(String[]::new); |
146 |
| - request.setExpandedJobIds(jobIds); |
147 |
| - List<MlDataRemover> dataRemovers = createDataRemovers(jobs, taskId, anomalyDetectionAuditor); |
148 |
| - deleteExpiredData(request, dataRemovers, listener, isTimedOutSupplier); |
149 |
| - }); |
150 |
| - }, listener::onFailure)); |
| 143 | + jobConfigProvider.expandJobs( |
| 144 | + request.getJobId(), |
| 145 | + false, |
| 146 | + true, |
| 147 | + null, |
| 148 | + ActionListener.wrap( |
| 149 | + jobBuilders -> threadPool.executor(MachineLearning.UTILITY_THREAD_POOL_NAME) |
| 150 | + .execute(ActionRunnable.wrap(listener, l -> { |
| 151 | + List<Job> jobs = jobBuilders.stream().map(Job.Builder::build).collect(Collectors.toList()); |
| 152 | + String[] jobIds = jobs.stream().map(Job::getId).toArray(String[]::new); |
| 153 | + request.setExpandedJobIds(jobIds); |
| 154 | + List<MlDataRemover> dataRemovers = createDataRemovers(jobs, taskId, anomalyDetectionAuditor); |
| 155 | + deleteExpiredData(request, dataRemovers, l, isTimedOutSupplier); |
| 156 | + })), |
| 157 | + listener::onFailure |
| 158 | + ) |
| 159 | + ); |
151 | 160 | }
|
152 | 161 | }
|
153 | 162 |
|
|
0 commit comments