Skip to content

Commit 9a91b06

Browse files
[ML] Allow analytics process define its own progress phases (#55763)
This is a continuation from #55580. Now that we're parsing phase progresses from the analytics process we change `ProgressTracker` to allow for custom phases between the `loading_data` and `writing_results` phases. Each `DataFrameAnalysis` may declare its own phases. This commit sets things in place for the analytics process to start reporting different phases per analysis type. However, this is still preserving existing behaviour as all analyses currently declare a single `analyzing` phase.
1 parent e161757 commit 9a91b06

File tree

15 files changed

+316
-87
lines changed

15 files changed

+316
-87
lines changed

x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/dataframe/analyses/Classification.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -346,6 +346,11 @@ public String getStateDocId(String jobId) {
346346
return jobId + STATE_DOC_ID_SUFFIX;
347347
}
348348

349+
@Override
350+
public List<String> getProgressPhases() {
351+
return Collections.singletonList("analyzing");
352+
}
353+
349354
public static String extractJobIdFromStateDoc(String stateDocId) {
350355
int suffixIndex = stateDocId.lastIndexOf(STATE_DOC_ID_SUFFIX);
351356
return suffixIndex <= 0 ? null : stateDocId.substring(0, suffixIndex);

x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/dataframe/analyses/DataFrameAnalysis.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,11 @@ public interface DataFrameAnalysis extends ToXContentObject, NamedWriteable {
6666
*/
6767
String getStateDocId(String jobId);
6868

69+
/**
70+
* Returns the progress phases the analysis goes through in order
71+
*/
72+
List<String> getProgressPhases();
73+
6974
/**
7075
* Summarizes information about the fields that is necessary for analysis to generate
7176
* the parameters needed for the process configuration.

x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/dataframe/analyses/OutlierDetection.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -249,6 +249,11 @@ public String getStateDocId(String jobId) {
249249
throw new UnsupportedOperationException("Outlier detection does not support state");
250250
}
251251

252+
@Override
253+
public List<String> getProgressPhases() {
254+
return Collections.singletonList("analyzing");
255+
}
256+
252257
public enum Method {
253258
LOF, LDOF, DISTANCE_KTH_NN, DISTANCE_KNN;
254259

x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/dataframe/analyses/Regression.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -213,6 +213,11 @@ public String getStateDocId(String jobId) {
213213
return jobId + STATE_DOC_ID_SUFFIX;
214214
}
215215

216+
@Override
217+
public List<String> getProgressPhases() {
218+
return Collections.singletonList("analyzing");
219+
}
220+
216221
public static String extractJobIdFromStateDoc(String stateDocId) {
217222
int suffixIndex = stateDocId.lastIndexOf(STATE_DOC_ID_SUFFIX);
218223
return suffixIndex <= 0 ? null : stateDocId.substring(0, suffixIndex);

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportGetDataFrameAnalyticsStatsAction.java

Lines changed: 36 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -44,9 +44,9 @@
4444
import org.elasticsearch.xpack.core.ml.dataframe.DataFrameAnalyticsTaskState;
4545
import org.elasticsearch.xpack.core.ml.dataframe.stats.AnalysisStats;
4646
import org.elasticsearch.xpack.core.ml.dataframe.stats.Fields;
47-
import org.elasticsearch.xpack.core.ml.dataframe.stats.common.MemoryUsage;
4847
import org.elasticsearch.xpack.core.ml.dataframe.stats.classification.ClassificationStats;
4948
import org.elasticsearch.xpack.core.ml.dataframe.stats.common.DataCounts;
49+
import org.elasticsearch.xpack.core.ml.dataframe.stats.common.MemoryUsage;
5050
import org.elasticsearch.xpack.core.ml.dataframe.stats.outlierdetection.OutlierDetectionStats;
5151
import org.elasticsearch.xpack.core.ml.dataframe.stats.regression.RegressionStats;
5252
import org.elasticsearch.xpack.core.ml.job.persistence.AnomalyDetectorsIndex;
@@ -55,7 +55,6 @@
5555
import org.elasticsearch.xpack.ml.dataframe.DataFrameAnalyticsTask;
5656
import org.elasticsearch.xpack.ml.dataframe.StoredProgress;
5757
import org.elasticsearch.xpack.ml.dataframe.stats.ProgressTracker;
58-
import org.elasticsearch.xpack.ml.dataframe.stats.StatsHolder;
5958
import org.elasticsearch.xpack.ml.utils.persistence.MlParserUtils;
6059

6160
import java.util.ArrayList;
@@ -105,25 +104,20 @@ protected void taskOperation(GetDataFrameAnalyticsStatsAction.Request request, D
105104
ActionListener<QueryPage<Stats>> listener) {
106105
logger.debug("Get stats for running task [{}]", task.getParams().getId());
107106

108-
ActionListener<StatsHolder> statsHolderListener = ActionListener.wrap(
109-
statsHolder -> {
107+
ActionListener<Void> reindexingProgressListener = ActionListener.wrap(
108+
aVoid -> {
110109
Stats stats = buildStats(
111110
task.getParams().getId(),
112-
statsHolder.getProgressTracker().report(),
113-
statsHolder.getDataCountsTracker().report(task.getParams().getId()),
114-
statsHolder.getMemoryUsage(),
115-
statsHolder.getAnalysisStats()
111+
task.getStatsHolder().getProgressTracker().report(),
112+
task.getStatsHolder().getDataCountsTracker().report(task.getParams().getId()),
113+
task.getStatsHolder().getMemoryUsage(),
114+
task.getStatsHolder().getAnalysisStats()
116115
);
117116
listener.onResponse(new QueryPage<>(Collections.singletonList(stats), 1,
118117
GetDataFrameAnalyticsAction.Response.RESULTS_FIELD));
119118
}, listener::onFailure
120119
);
121120

122-
ActionListener<Void> reindexingProgressListener = ActionListener.wrap(
123-
aVoid -> statsHolderListener.onResponse(task.getStatsHolder()),
124-
listener::onFailure
125-
);
126-
127121
task.updateReindexTaskProgress(reindexingProgressListener);
128122
}
129123

@@ -138,7 +132,7 @@ protected void doExecute(Task task, GetDataFrameAnalyticsStatsAction.Request req
138132
.collect(Collectors.toList());
139133
request.setExpandedIds(expandedIds);
140134
ActionListener<GetDataFrameAnalyticsStatsAction.Response> runningTasksStatsListener = ActionListener.wrap(
141-
runningTasksStatsResponse -> gatherStatsForStoppedTasks(request.getExpandedIds(), runningTasksStatsResponse,
135+
runningTasksStatsResponse -> gatherStatsForStoppedTasks(getResponse.getResources().results(), runningTasksStatsResponse,
142136
ActionListener.wrap(
143137
finalResponse -> {
144138

@@ -163,20 +157,20 @@ protected void doExecute(Task task, GetDataFrameAnalyticsStatsAction.Request req
163157
executeAsyncWithOrigin(client, ML_ORIGIN, GetDataFrameAnalyticsAction.INSTANCE, getRequest, getResponseListener);
164158
}
165159

166-
void gatherStatsForStoppedTasks(List<String> expandedIds, GetDataFrameAnalyticsStatsAction.Response runningTasksResponse,
160+
void gatherStatsForStoppedTasks(List<DataFrameAnalyticsConfig> configs, GetDataFrameAnalyticsStatsAction.Response runningTasksResponse,
167161
ActionListener<GetDataFrameAnalyticsStatsAction.Response> listener) {
168-
List<String> stoppedTasksIds = determineStoppedTasksIds(expandedIds, runningTasksResponse.getResponse().results());
169-
if (stoppedTasksIds.isEmpty()) {
162+
List<DataFrameAnalyticsConfig> stoppedConfigs = determineStoppedConfigs(configs, runningTasksResponse.getResponse().results());
163+
if (stoppedConfigs.isEmpty()) {
170164
listener.onResponse(runningTasksResponse);
171165
return;
172166
}
173167

174-
AtomicInteger counter = new AtomicInteger(stoppedTasksIds.size());
175-
AtomicArray<Stats> jobStats = new AtomicArray<>(stoppedTasksIds.size());
176-
for (int i = 0; i < stoppedTasksIds.size(); i++) {
168+
AtomicInteger counter = new AtomicInteger(stoppedConfigs.size());
169+
AtomicArray<Stats> jobStats = new AtomicArray<>(stoppedConfigs.size());
170+
for (int i = 0; i < stoppedConfigs.size(); i++) {
177171
final int slot = i;
178-
String jobId = stoppedTasksIds.get(i);
179-
searchStats(jobId, ActionListener.wrap(
172+
DataFrameAnalyticsConfig config = stoppedConfigs.get(i);
173+
searchStats(config, ActionListener.wrap(
180174
stats -> {
181175
jobStats.set(slot, stats);
182176
if (counter.decrementAndGet() == 0) {
@@ -192,23 +186,24 @@ void gatherStatsForStoppedTasks(List<String> expandedIds, GetDataFrameAnalyticsS
192186
}
193187
}
194188

195-
static List<String> determineStoppedTasksIds(List<String> expandedIds, List<Stats> runningTasksStats) {
189+
static List<DataFrameAnalyticsConfig> determineStoppedConfigs(List<DataFrameAnalyticsConfig> configs, List<Stats> runningTasksStats) {
196190
Set<String> startedTasksIds = runningTasksStats.stream().map(Stats::getId).collect(Collectors.toSet());
197-
return expandedIds.stream().filter(id -> startedTasksIds.contains(id) == false).collect(Collectors.toList());
191+
return configs.stream().filter(config -> startedTasksIds.contains(config.getId()) == false).collect(Collectors.toList());
198192
}
199193

200-
private void searchStats(String configId, ActionListener<Stats> listener) {
201-
logger.debug("[{}] Gathering stats for stopped task", configId);
194+
private void searchStats(DataFrameAnalyticsConfig config, ActionListener<Stats> listener) {
195+
logger.debug("[{}] Gathering stats for stopped task", config.getId());
202196

203-
RetrievedStatsHolder retrievedStatsHolder = new RetrievedStatsHolder();
197+
RetrievedStatsHolder retrievedStatsHolder = new RetrievedStatsHolder(
198+
ProgressTracker.fromZeroes(config.getAnalysis().getProgressPhases()).report());
204199

205200
MultiSearchRequest multiSearchRequest = new MultiSearchRequest();
206-
multiSearchRequest.add(buildStoredProgressSearch(configId));
207-
multiSearchRequest.add(buildStatsDocSearch(configId, DataCounts.TYPE_VALUE));
208-
multiSearchRequest.add(buildStatsDocSearch(configId, MemoryUsage.TYPE_VALUE));
209-
multiSearchRequest.add(buildStatsDocSearch(configId, OutlierDetectionStats.TYPE_VALUE));
210-
multiSearchRequest.add(buildStatsDocSearch(configId, ClassificationStats.TYPE_VALUE));
211-
multiSearchRequest.add(buildStatsDocSearch(configId, RegressionStats.TYPE_VALUE));
201+
multiSearchRequest.add(buildStoredProgressSearch(config.getId()));
202+
multiSearchRequest.add(buildStatsDocSearch(config.getId(), DataCounts.TYPE_VALUE));
203+
multiSearchRequest.add(buildStatsDocSearch(config.getId(), MemoryUsage.TYPE_VALUE));
204+
multiSearchRequest.add(buildStatsDocSearch(config.getId(), OutlierDetectionStats.TYPE_VALUE));
205+
multiSearchRequest.add(buildStatsDocSearch(config.getId(), ClassificationStats.TYPE_VALUE));
206+
multiSearchRequest.add(buildStatsDocSearch(config.getId(), RegressionStats.TYPE_VALUE));
212207

213208
executeAsyncWithOrigin(client, ML_ORIGIN, MultiSearchAction.INSTANCE, multiSearchRequest, ActionListener.wrap(
214209
multiSearchResponse -> {
@@ -220,7 +215,7 @@ private void searchStats(String configId, ActionListener<Stats> listener) {
220215
logger.error(
221216
new ParameterizedMessage(
222217
"[{}] Item failure encountered during multi search for request [indices={}, source={}]: {}",
223-
configId, itemRequest.indices(), itemRequest.source(), itemResponse.getFailureMessage()),
218+
config.getId(), itemRequest.indices(), itemRequest.source(), itemResponse.getFailureMessage()),
224219
itemResponse.getFailure());
225220
listener.onFailure(ExceptionsHelper.serverError(itemResponse.getFailureMessage(), itemResponse.getFailure()));
226221
return;
@@ -229,13 +224,13 @@ private void searchStats(String configId, ActionListener<Stats> listener) {
229224
if (hits.length == 0) {
230225
// Not found
231226
} else if (hits.length == 1) {
232-
parseHit(hits[0], configId, retrievedStatsHolder);
227+
parseHit(hits[0], config.getId(), retrievedStatsHolder);
233228
} else {
234229
throw ExceptionsHelper.serverError("Found [" + hits.length + "] hits when just one was requested");
235230
}
236231
}
237232
}
238-
listener.onResponse(buildStats(configId,
233+
listener.onResponse(buildStats(config.getId(),
239234
retrievedStatsHolder.progress.get(),
240235
retrievedStatsHolder.dataCounts,
241236
retrievedStatsHolder.memoryUsage,
@@ -322,9 +317,13 @@ private GetDataFrameAnalyticsStatsAction.Response.Stats buildStats(String concre
322317

323318
private static class RetrievedStatsHolder {
324319

325-
private volatile StoredProgress progress = new StoredProgress(new ProgressTracker().report());
320+
private volatile StoredProgress progress;
326321
private volatile DataCounts dataCounts;
327322
private volatile MemoryUsage memoryUsage;
328323
private volatile AnalysisStats analysisStats;
324+
325+
private RetrievedStatsHolder(List<PhaseProgress> defaultProgress) {
326+
progress = new StoredProgress(defaultProgress);
327+
}
329328
}
330329
}

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/DataFrameAnalyticsManager.java

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -81,6 +81,11 @@ public void execute(DataFrameAnalyticsTask task, DataFrameAnalyticsState current
8181
// With config in hand, determine action to take
8282
ActionListener<DataFrameAnalyticsConfig> configListener = ActionListener.wrap(
8383
config -> {
84+
// At this point we have the config at hand and we can reset the progress tracker
85+
// to use the analyses phases. We preserve reindexing progress as if reindexing was
86+
// finished it will not be reset.
87+
task.getStatsHolder().resetProgressTrackerPreservingReindexingProgress(config.getAnalysis().getProgressPhases());
88+
8489
switch(currentState) {
8590
// If we are STARTED, it means the job was started because the start API was called.
8691
// We should determine the job's starting state based on its previous progress.
@@ -217,7 +222,6 @@ private void reindexDataframeAndStartAnalysis(DataFrameAnalyticsTask task, DataF
217222
return;
218223
}
219224
task.setReindexingTaskId(null);
220-
task.setReindexingFinished();
221225
auditor.info(
222226
config.getId(),
223227
Messages.getMessage(Messages.DATA_FRAME_ANALYTICS_AUDIT_FINISHED_REINDEXING, config.getDest().getIndex(),
@@ -296,6 +300,7 @@ private void startAnalytics(DataFrameAnalyticsTask task, DataFrameAnalyticsConfi
296300
task.markAsCompleted();
297301
return;
298302
}
303+
299304
final ParentTaskAssigningClient parentTaskClient = new ParentTaskAssigningClient(client, task.getParentTaskId());
300305
// Update state to ANALYZING and start process
301306
ActionListener<DataFrameDataExtractorFactory> dataExtractorFactoryListener = ActionListener.wrap(
@@ -327,8 +332,8 @@ private void startAnalytics(DataFrameAnalyticsTask task, DataFrameAnalyticsConfi
327332

328333
ActionListener<RefreshResponse> refreshListener = ActionListener.wrap(
329334
refreshResponse -> {
330-
// Ensure we mark reindexing is finished for the case we are recovering a task that had finished reindexing
331-
task.setReindexingFinished();
335+
// Now we can ensure reindexing progress is complete
336+
task.getStatsHolder().getProgressTracker().updateReindexingProgress(100);
332337

333338
// TODO This could fail with errors. In that case we get stuck with the copied index.
334339
// We could delete the index in case of failure or we could try building the factory before reindexing

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/DataFrameAnalyticsTask.java

Lines changed: 8 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -67,10 +67,9 @@ public class DataFrameAnalyticsTask extends AllocatedPersistentTask implements S
6767
private final StartDataFrameAnalyticsAction.TaskParams taskParams;
6868
@Nullable
6969
private volatile Long reindexingTaskId;
70-
private volatile boolean isReindexingFinished;
7170
private volatile boolean isStopping;
7271
private volatile boolean isMarkAsCompletedCalled;
73-
private final StatsHolder statsHolder = new StatsHolder();
72+
private final StatsHolder statsHolder;
7473

7574
public DataFrameAnalyticsTask(long id, String type, String action, TaskId parentTask, Map<String, String> headers,
7675
Client client, ClusterService clusterService, DataFrameAnalyticsManager analyticsManager,
@@ -81,6 +80,7 @@ public DataFrameAnalyticsTask(long id, String type, String action, TaskId parent
8180
this.analyticsManager = Objects.requireNonNull(analyticsManager);
8281
this.auditor = Objects.requireNonNull(auditor);
8382
this.taskParams = Objects.requireNonNull(taskParams);
83+
this.statsHolder = new StatsHolder(taskParams.getProgressOnStart());
8484
}
8585

8686
public StartDataFrameAnalyticsAction.TaskParams getParams() {
@@ -92,10 +92,6 @@ public void setReindexingTaskId(Long reindexingTaskId) {
9292
this.reindexingTaskId = reindexingTaskId;
9393
}
9494

95-
public void setReindexingFinished() {
96-
isReindexingFinished = true;
97-
}
98-
9995
public boolean isStopping() {
10096
return isStopping;
10197
}
@@ -222,7 +218,7 @@ public void updateReindexTaskProgress(ActionListener<Void> listener) {
222218
// We set reindexing progress at least to 1 for a running process to be able to
223219
// distinguish a job that is running for the first time against a job that is restarting.
224220
reindexTaskProgress -> {
225-
statsHolder.getProgressTracker().reindexingPercent.set(Math.max(1, reindexTaskProgress));
221+
statsHolder.getProgressTracker().updateReindexingProgress(Math.max(1, reindexTaskProgress));
226222
listener.onResponse(null);
227223
},
228224
listener::onFailure
@@ -232,9 +228,7 @@ public void updateReindexTaskProgress(ActionListener<Void> listener) {
232228
private void getReindexTaskProgress(ActionListener<Integer> listener) {
233229
TaskId reindexTaskId = getReindexTaskId();
234230
if (reindexTaskId == null) {
235-
// The task is not present which means either it has not started yet or it finished.
236-
// We keep track of whether the task has finished so we can use that to tell whether the progress 100.
237-
listener.onResponse(isReindexingFinished ? 100 : 0);
231+
listener.onResponse(statsHolder.getProgressTracker().getReindexingProgressPercent());
238232
return;
239233
}
240234

@@ -250,8 +244,7 @@ private void getReindexTaskProgress(ActionListener<Integer> listener) {
250244
error -> {
251245
if (ExceptionsHelper.unwrapCause(error) instanceof ResourceNotFoundException) {
252246
// The task is not present which means either it has not started yet or it finished.
253-
// We keep track of whether the task has finished so we can use that to tell whether the progress 100.
254-
listener.onResponse(isReindexingFinished ? 100 : 0);
247+
listener.onResponse(statsHolder.getProgressTracker().getReindexingProgressPercent());
255248
} else {
256249
listener.onFailure(error);
257250
}
@@ -366,17 +359,10 @@ public static StartingState determineStartingState(String jobId, List<PhaseProgr
366359
LOGGER.debug("[{}] Last incomplete progress [{}, {}]", jobId, lastIncompletePhase.getPhase(),
367360
lastIncompletePhase.getProgressPercent());
368361

369-
switch (lastIncompletePhase.getPhase()) {
370-
case ProgressTracker.REINDEXING:
371-
return lastIncompletePhase.getProgressPercent() == 0 ? StartingState.FIRST_TIME : StartingState.RESUMING_REINDEXING;
372-
case ProgressTracker.LOADING_DATA:
373-
case ProgressTracker.ANALYZING:
374-
case ProgressTracker.WRITING_RESULTS:
375-
return StartingState.RESUMING_ANALYZING;
376-
default:
377-
LOGGER.warn("[{}] Unexpected progress phase [{}]", jobId, lastIncompletePhase.getPhase());
378-
return StartingState.FIRST_TIME;
362+
if (ProgressTracker.REINDEXING.equals(lastIncompletePhase.getPhase())) {
363+
return lastIncompletePhase.getProgressPercent() == 0 ? StartingState.FIRST_TIME : StartingState.RESUMING_REINDEXING;
379364
}
365+
return StartingState.RESUMING_ANALYZING;
380366
}
381367

382368
}

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/process/AnalyticsProcessManager.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -246,7 +246,7 @@ private void writeDataRows(DataFrameDataExtractor dataExtractor, AnalyticsProces
246246
}
247247
}
248248
rowsProcessed += rows.get().size();
249-
progressTracker.loadingDataPercent.set(rowsProcessed >= totalRows ? 100 : (int) (rowsProcessed * 100.0 / totalRows));
249+
progressTracker.updateLoadingDataProgress(rowsProcessed >= totalRows ? 100 : (int) (rowsProcessed * 100.0 / totalRows));
250250
}
251251
}
252252
}

0 commit comments

Comments
 (0)