From 69917d40952ddcb8298cd768452cc0bf81c24d36 Mon Sep 17 00:00:00 2001 From: Hendrik Muhs Date: Thu, 2 May 2019 11:55:25 +0200 Subject: [PATCH 1/3] reset/clear the position after indexer is done --- .../xpack/core/indexing/AsyncTwoPhaseIndexer.java | 1 + .../dataframe/integration/DataFrameGetAndGetStatsIT.java | 8 ++++++++ .../xpack/dataframe/transforms/DataFrameIndexer.java | 9 ++++++++- 3 files changed, 17 insertions(+), 1 deletion(-) diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/indexing/AsyncTwoPhaseIndexer.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/indexing/AsyncTwoPhaseIndexer.java index 636a3978443e0..ec7e0de9e34fc 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/indexing/AsyncTwoPhaseIndexer.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/indexing/AsyncTwoPhaseIndexer.java @@ -313,6 +313,7 @@ private void onSearchResponse(SearchResponse searchResponse) { if (iterationResult.isDone()) { logger.debug("Finished indexing for job [" + getJobId() + "], saving state and shutting down."); + position.set(iterationResult.getPosition()); // execute finishing tasks onFinish(ActionListener.wrap( r -> doSaveState(finishAndSetState(), position.get(), () -> {}), diff --git a/x-pack/plugin/data-frame/qa/single-node-tests/src/test/java/org/elasticsearch/xpack/dataframe/integration/DataFrameGetAndGetStatsIT.java b/x-pack/plugin/data-frame/qa/single-node-tests/src/test/java/org/elasticsearch/xpack/dataframe/integration/DataFrameGetAndGetStatsIT.java index 62101e4e12064..d9927cd09ed8f 100644 --- a/x-pack/plugin/data-frame/qa/single-node-tests/src/test/java/org/elasticsearch/xpack/dataframe/integration/DataFrameGetAndGetStatsIT.java +++ b/x-pack/plugin/data-frame/qa/single-node-tests/src/test/java/org/elasticsearch/xpack/dataframe/integration/DataFrameGetAndGetStatsIT.java @@ -103,6 +103,14 @@ public void testGetAndGetStats() throws Exception { stats = entityAsMap(client().performRequest(getRequest)); assertEquals(1, XContentMapValues.extractValue("count", stats)); + transformsStats = (List>)XContentMapValues.extractValue("transforms", stats); + assertEquals(1, transformsStats.size()); + Map state = (Map) XContentMapValues.extractValue("state", transformsStats.get(0)); + assertEquals(1, transformsStats.size()); + assertEquals("started", XContentMapValues.extractValue("task_state", state)); + assertEquals(null, XContentMapValues.extractValue("current_position", state)); + assertEquals(1, XContentMapValues.extractValue("checkpoint", state)); + // check all the different ways to retrieve all transforms getRequest = createRequestWithAuth("GET", DATAFRAME_ENDPOINT, authHeader); Map transforms = entityAsMap(client().performRequest(getRequest)); diff --git a/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/transforms/DataFrameIndexer.java b/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/transforms/DataFrameIndexer.java index 5fde9a1cac60e..0fa68d084fe80 100644 --- a/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/transforms/DataFrameIndexer.java +++ b/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/transforms/DataFrameIndexer.java @@ -14,8 +14,8 @@ import org.elasticsearch.action.search.SearchPhaseExecutionException; import org.elasticsearch.action.search.SearchRequest; import org.elasticsearch.action.search.SearchResponse; -import org.elasticsearch.common.Nullable; import org.elasticsearch.action.search.ShardSearchFailure; +import org.elasticsearch.common.Nullable; import org.elasticsearch.common.breaker.CircuitBreakingException; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.index.query.QueryBuilder; @@ -33,6 +33,7 @@ import java.io.IOException; import java.io.UncheckedIOException; +import java.util.Collections; import java.util.Map; import java.util.Objects; import java.util.concurrent.Executor; @@ -115,6 +116,12 @@ protected void onFinish(ActionListener listener) { @Override protected IterationResult> doProcess(SearchResponse searchResponse) { final CompositeAggregation agg = searchResponse.getAggregations().get(COMPOSITE_AGGREGATION_NAME); + + // we reached the end + if (agg.getBuckets().isEmpty()) { + return new IterationResult<>(Collections.emptyList(), null, true); + } + long docsBeforeProcess = getStats().getNumDocuments(); IterationResult> result = new IterationResult<>(processBucketsToIndexRequests(agg).collect(Collectors.toList()), agg.afterKey(), From e0ff8eef81bc8c9b326121abca73fc51fd0e7dc5 Mon Sep 17 00:00:00 2001 From: Hendrik Muhs Date: Thu, 2 May 2019 14:15:29 +0200 Subject: [PATCH 2/3] do not set position for rollup --- .../org/elasticsearch/xpack/rollup/job/RollupIndexer.java | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/rollup/job/RollupIndexer.java b/x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/rollup/job/RollupIndexer.java index f99d5ec5993b5..15808091fcea6 100644 --- a/x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/rollup/job/RollupIndexer.java +++ b/x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/rollup/job/RollupIndexer.java @@ -128,6 +128,11 @@ protected SearchRequest buildSearchRequest() { protected IterationResult> doProcess(SearchResponse searchResponse) { final CompositeAggregation response = searchResponse.getAggregations().get(AGGREGATION_NAME); + if (response.getBuckets().isEmpty()) { + // do not reset the position as we what to continue from where we stopped + return new IterationResult<>(Collections.emptyList(), getPosition(), true); + } + return new IterationResult<>( IndexerUtils.processBuckets(response, job.getConfig().getRollupIndex(), getStats(), job.getConfig().getGroupConfig(), job.getConfig().getId()), From c03b2316fb0b869ff09c1350e1ac2cafd57b4ccc Mon Sep 17 00:00:00 2001 From: Hendrik Muhs Date: Thu, 2 May 2019 14:23:45 +0200 Subject: [PATCH 3/3] fix typo --- .../java/org/elasticsearch/xpack/rollup/job/RollupIndexer.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/rollup/job/RollupIndexer.java b/x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/rollup/job/RollupIndexer.java index 15808091fcea6..aa6a9d4d58eee 100644 --- a/x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/rollup/job/RollupIndexer.java +++ b/x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/rollup/job/RollupIndexer.java @@ -129,7 +129,7 @@ protected IterationResult> doProcess(SearchResponse searchRe final CompositeAggregation response = searchResponse.getAggregations().get(AGGREGATION_NAME); if (response.getBuckets().isEmpty()) { - // do not reset the position as we what to continue from where we stopped + // do not reset the position as we want to continue from where we stopped return new IterationResult<>(Collections.emptyList(), getPosition(), true); }