Skip to content

Commit 0c03707

Browse files
author
Hendrik Muhs
committed
[ML-DataFrame] reset/clear the position after indexer is done (elastic#41736)
reset/clear the position after indexer is done
1 parent ee84038 commit 0c03707

File tree

4 files changed

+22
-0
lines changed

4 files changed

+22
-0
lines changed

x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/indexing/AsyncTwoPhaseIndexer.java

+1
Original file line numberDiff line numberDiff line change
@@ -313,6 +313,7 @@ private void onSearchResponse(SearchResponse searchResponse) {
313313
if (iterationResult.isDone()) {
314314
logger.debug("Finished indexing for job [" + getJobId() + "], saving state and shutting down.");
315315

316+
position.set(iterationResult.getPosition());
316317
// execute finishing tasks
317318
onFinish(ActionListener.wrap(
318319
r -> doSaveState(finishAndSetState(), position.get(), () -> {}),

x-pack/plugin/data-frame/qa/single-node-tests/src/test/java/org/elasticsearch/xpack/dataframe/integration/DataFrameGetAndGetStatsIT.java

+8
Original file line numberDiff line numberDiff line change
@@ -103,6 +103,14 @@ public void testGetAndGetStats() throws Exception {
103103
stats = entityAsMap(client().performRequest(getRequest));
104104
assertEquals(1, XContentMapValues.extractValue("count", stats));
105105

106+
transformsStats = (List<Map<String, Object>>)XContentMapValues.extractValue("transforms", stats);
107+
assertEquals(1, transformsStats.size());
108+
Map<String, Object> state = (Map<String, Object>) XContentMapValues.extractValue("state", transformsStats.get(0));
109+
assertEquals(1, transformsStats.size());
110+
assertEquals("started", XContentMapValues.extractValue("task_state", state));
111+
assertEquals(null, XContentMapValues.extractValue("current_position", state));
112+
assertEquals(1, XContentMapValues.extractValue("checkpoint", state));
113+
106114
// check all the different ways to retrieve all transforms
107115
getRequest = createRequestWithAuth("GET", DATAFRAME_ENDPOINT, authHeader);
108116
Map<String, Object> transforms = entityAsMap(client().performRequest(getRequest));

x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/transforms/DataFrameIndexer.java

+8
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
import org.elasticsearch.action.search.SearchRequest;
1515
import org.elasticsearch.action.search.SearchResponse;
1616
import org.elasticsearch.action.search.ShardSearchFailure;
17+
import org.elasticsearch.common.Nullable;
1718
import org.elasticsearch.common.breaker.CircuitBreakingException;
1819
import org.elasticsearch.common.xcontent.XContentBuilder;
1920
import org.elasticsearch.search.aggregations.bucket.composite.CompositeAggregation;
@@ -31,6 +32,7 @@
3132

3233
import java.io.IOException;
3334
import java.io.UncheckedIOException;
35+
import java.util.Collections;
3436
import java.util.Map;
3537
import java.util.Objects;
3638
import java.util.concurrent.Executor;
@@ -127,6 +129,12 @@ protected void onFinish(ActionListener<Void> listener) {
127129
@Override
128130
protected IterationResult<Map<String, Object>> doProcess(SearchResponse searchResponse) {
129131
final CompositeAggregation agg = searchResponse.getAggregations().get(COMPOSITE_AGGREGATION_NAME);
132+
133+
// we reached the end
134+
if (agg.getBuckets().isEmpty()) {
135+
return new IterationResult<>(Collections.emptyList(), null, true);
136+
}
137+
130138
long docsBeforeProcess = getStats().getNumDocuments();
131139
IterationResult<Map<String, Object>> result = new IterationResult<>(processBucketsToIndexRequests(agg).collect(Collectors.toList()),
132140
agg.afterKey(),

x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/rollup/job/RollupIndexer.java

+5
Original file line numberDiff line numberDiff line change
@@ -143,6 +143,11 @@ protected SearchRequest buildSearchRequest() {
143143
protected IterationResult<Map<String, Object>> doProcess(SearchResponse searchResponse) {
144144
final CompositeAggregation response = searchResponse.getAggregations().get(AGGREGATION_NAME);
145145

146+
if (response.getBuckets().isEmpty()) {
147+
// do not reset the position as we want to continue from where we stopped
148+
return new IterationResult<>(Collections.emptyList(), getPosition(), true);
149+
}
150+
146151
return new IterationResult<>(
147152
IndexerUtils.processBuckets(response, job.getConfig().getRollupIndex(), getStats(),
148153
job.getConfig().getGroupConfig(), job.getConfig().getId(), upgradedDocumentID.get()),

0 commit comments

Comments
 (0)