From d02067d50245ae91eaecb0b363c113dd6c31cc27 Mon Sep 17 00:00:00 2001 From: weizijun Date: Tue, 12 Jul 2022 16:13:11 +0800 Subject: [PATCH 01/14] support task cancel --- .../xpack/core/XPackClientPlugin.java | 2 + .../rollup/action/RollupIndexerAction.java | 14 ++ .../core/rollup/action/RollupShardStatus.java | 207 ++++++++++++++++++ .../core/rollup/action/RollupShardTask.java | 56 +++++ .../RollupShardStatusSerializingTests.java | 42 ++++ .../xpack/rollup/v2/RollupShardIndexer.java | 49 +++-- .../v2/TransportRollupIndexerAction.java | 2 + .../v2/RollupActionSingleNodeTests.java | 41 ++++ 8 files changed, 398 insertions(+), 15 deletions(-) create mode 100644 x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/rollup/action/RollupShardStatus.java create mode 100644 x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/rollup/action/RollupShardTask.java create mode 100644 x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/rollup/action/RollupShardStatusSerializingTests.java diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/XPackClientPlugin.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/XPackClientPlugin.java index 2ea7bf2fdfafc..2d575083c28d5 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/XPackClientPlugin.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/XPackClientPlugin.java @@ -153,6 +153,7 @@ import org.elasticsearch.xpack.core.rollup.action.RollupAction; import org.elasticsearch.xpack.core.rollup.action.RollupIndexerAction; import org.elasticsearch.xpack.core.rollup.action.RollupSearchAction; +import org.elasticsearch.xpack.core.rollup.action.RollupShardStatus; import org.elasticsearch.xpack.core.rollup.action.StartRollupJobAction; import org.elasticsearch.xpack.core.rollup.action.StopRollupJobAction; import org.elasticsearch.xpack.core.rollup.job.RollupJob; @@ -474,6 +475,7 @@ public List getNamedWriteables() { new NamedWriteableRegistry.Entry(PersistentTaskParams.class, RollupJob.NAME, RollupJob::new), new NamedWriteableRegistry.Entry(Task.Status.class, RollupJobStatus.NAME, RollupJobStatus::new), new NamedWriteableRegistry.Entry(PersistentTaskState.class, RollupJobStatus.NAME, RollupJobStatus::new), + new NamedWriteableRegistry.Entry(Task.Status.class, RollupShardStatus.NAME, RollupShardStatus::new), // ccr new NamedWriteableRegistry.Entry(AutoFollowMetadata.class, AutoFollowMetadata.TYPE, AutoFollowMetadata::new), new NamedWriteableRegistry.Entry(Metadata.Custom.class, AutoFollowMetadata.TYPE, AutoFollowMetadata::new), diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/rollup/action/RollupIndexerAction.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/rollup/action/RollupIndexerAction.java index 5c882f713e96a..a0c82e414bb2c 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/rollup/action/RollupIndexerAction.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/rollup/action/RollupIndexerAction.java @@ -230,6 +230,20 @@ public void writeTo(StreamOutput out) throws IOException { super.writeTo(out); request.writeTo(out); } + + @Override + public Task createTask(long id, String type, String action, TaskId parentTaskId, Map headers) { + return new RollupShardTask( + id, + type, + action, + parentTaskId, + request.rollupRequest.getRollupIndex(), + request.rollupRequest.getRollupConfig(), + headers, + shardId() + ); + } } public static class ShardRollupResponse extends BroadcastShardResponse { diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/rollup/action/RollupShardStatus.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/rollup/action/RollupShardStatus.java new file mode 100644 index 0000000000000..fdcaabc6f4a69 --- /dev/null +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/rollup/action/RollupShardStatus.java @@ -0,0 +1,207 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.xpack.core.rollup.action; + +import org.elasticsearch.common.Strings; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.index.shard.ShardId; +import org.elasticsearch.tasks.Task; +import org.elasticsearch.xcontent.ConstructingObjectParser; +import org.elasticsearch.xcontent.ParseField; +import org.elasticsearch.xcontent.XContentBuilder; +import org.elasticsearch.xcontent.XContentParser; + +import java.io.IOException; +import java.time.Instant; +import java.util.Objects; +import java.util.concurrent.atomic.AtomicLong; + +import static org.elasticsearch.xcontent.ConstructingObjectParser.constructorArg; + +public class RollupShardStatus implements Task.Status { + public static final String NAME = "rollup-index-shard"; + private static final ParseField SHARD_FIELD = new ParseField("shard"); + private static final ParseField STATUS_FIELD = new ParseField("status"); + private static final ParseField START_TIME_FIELD = new ParseField("start_time"); + private static final ParseField IN_NUM_DOCS_RECEIVED_FIELD = new ParseField("in_num_docs_received"); + private static final ParseField OUT_NUM_DOCS_SENT_FIELD = new ParseField("out_num_docs_sent"); + private static final ParseField OUT_NUM_DOCS_INDEXED_FIELD = new ParseField("out_num_docs_indexed"); + private static final ParseField OUT_NUM_DOCS_FAILED_FIELD = new ParseField("out_num_docs_failed"); + + private final ShardId shardId; + private final long rollupStart; + private Status status; + private AtomicLong numReceived = new AtomicLong(0); + private AtomicLong numSent = new AtomicLong(0); + private AtomicLong numIndexed = new AtomicLong(0); + private AtomicLong numFailed = new AtomicLong(0); + + private static final ConstructingObjectParser PARSER; + static { + PARSER = new ConstructingObjectParser<>( + NAME, + args -> new RollupShardStatus( + ShardId.fromString((String) args[0]), + Status.valueOf((String) args[1]), + Instant.parse((String) args[2]).toEpochMilli(), + new AtomicLong((Long) args[3]), + new AtomicLong((Long) args[4]), + new AtomicLong((Long) args[5]), + new AtomicLong((Long) args[6]) + ) + ); + + PARSER.declareString(constructorArg(), SHARD_FIELD); + PARSER.declareString(constructorArg(), STATUS_FIELD); + PARSER.declareString(constructorArg(), START_TIME_FIELD); + PARSER.declareLong(constructorArg(), IN_NUM_DOCS_RECEIVED_FIELD); + PARSER.declareLong(constructorArg(), OUT_NUM_DOCS_SENT_FIELD); + PARSER.declareLong(constructorArg(), OUT_NUM_DOCS_INDEXED_FIELD); + PARSER.declareLong(constructorArg(), OUT_NUM_DOCS_FAILED_FIELD); + } + + public RollupShardStatus(StreamInput in) throws IOException { + shardId = new ShardId(in); + status = in.readEnum(Status.class); + rollupStart = in.readLong(); + numReceived = new AtomicLong(in.readLong()); + numSent = new AtomicLong(in.readLong()); + numIndexed = new AtomicLong(in.readLong()); + numFailed = new AtomicLong(in.readLong()); + } + + public RollupShardStatus( + ShardId shardId, + Status status, + long rollupStart, + AtomicLong numReceived, + AtomicLong numSent, + AtomicLong numIndexed, + AtomicLong numFailed + ) { + this.shardId = shardId; + this.status = status; + this.rollupStart = rollupStart; + this.numReceived = numReceived; + this.numSent = numSent; + this.numIndexed = numIndexed; + this.numFailed = numFailed; + } + + public RollupShardStatus(ShardId shardId) { + status = Status.STARTED; + this.shardId = shardId; + this.rollupStart = System.currentTimeMillis(); + } + + public void init(AtomicLong numReceived, AtomicLong numSent, AtomicLong numIndexed, AtomicLong numFailed) { + this.numReceived = numReceived; + this.numSent = numSent; + this.numIndexed = numIndexed; + this.numFailed = numFailed; + } + + public Status getStatus() { + return status; + } + + public void setStatus(Status status) { + this.status = status; + } + + public static RollupShardStatus fromXContent(XContentParser parser) throws IOException { + return PARSER.parse(parser, null); + } + + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + builder.startObject(); + builder.field(SHARD_FIELD.getPreferredName(), shardId); + builder.field(STATUS_FIELD.getPreferredName(), status); + builder.field(START_TIME_FIELD.getPreferredName(), Instant.ofEpochMilli(rollupStart).toString()); + builder.field(IN_NUM_DOCS_RECEIVED_FIELD.getPreferredName(), numReceived.get()); + builder.field(OUT_NUM_DOCS_SENT_FIELD.getPreferredName(), numSent.get()); + builder.field(OUT_NUM_DOCS_INDEXED_FIELD.getPreferredName(), numIndexed.get()); + builder.field(OUT_NUM_DOCS_FAILED_FIELD.getPreferredName(), numFailed.get()); + builder.endObject(); + return builder; + } + + @Override + public String getWriteableName() { + return NAME; + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + shardId.writeTo(out); + out.writeEnum(status); + out.writeLong(rollupStart); + out.writeLong(numReceived.get()); + out.writeLong(numSent.get()); + out.writeLong(numIndexed.get()); + out.writeLong(numFailed.get()); + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + RollupShardStatus that = (RollupShardStatus) o; + return rollupStart == that.rollupStart + && Objects.equals(shardId.getIndexName(), that.shardId.getIndexName()) + && Objects.equals(shardId.id(), that.shardId.id()) + && status == that.status + && Objects.equals(numReceived.get(), that.numReceived.get()) + && Objects.equals(numSent.get(), that.numSent.get()) + && Objects.equals(numIndexed.get(), that.numIndexed.get()) + && Objects.equals(numFailed.get(), that.numFailed.get()); + } + + @Override + public int hashCode() { + return Objects.hash( + shardId.getIndexName(), + shardId.id(), + rollupStart, + status, + numReceived.get(), + numSent.get(), + numIndexed.get(), + numFailed.get() + ); + } + + @Override + public String toString() { + return Strings.toString(this); + } + + public enum Status { + STARTED, + FINISHED, + ABORT + } + + public void setNumSent(AtomicLong numSent) { + this.numSent = numSent; + } + + public void setNumIndexed(AtomicLong numIndexed) { + this.numIndexed = numIndexed; + } + + public void setNumFailed(AtomicLong numFailed) { + this.numFailed = numFailed; + } +} diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/rollup/action/RollupShardTask.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/rollup/action/RollupShardTask.java new file mode 100644 index 0000000000000..fc48b1d8a3e38 --- /dev/null +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/rollup/action/RollupShardTask.java @@ -0,0 +1,56 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.xpack.core.rollup.action; + +import java.util.Map; + +import org.elasticsearch.index.shard.ShardId; +import org.elasticsearch.tasks.CancellableTask; +import org.elasticsearch.tasks.TaskId; +import org.elasticsearch.xpack.core.rollup.RollupActionConfig; +import org.elasticsearch.xpack.core.rollup.RollupField; + +public class RollupShardTask extends CancellableTask { + private String rollupIndex; + private RollupActionConfig config; + private volatile RollupShardStatus status; + + public RollupShardTask( + long id, + String type, + String action, + TaskId parentTask, + String rollupIndex, + RollupActionConfig config, + Map headers, + ShardId shardId + ) { + super(id, type, action, RollupField.NAME + "_" + rollupIndex + "[" + shardId.id() + "]", parentTask, headers); + this.rollupIndex = rollupIndex; + this.config = config; + this.status = new RollupShardStatus(shardId); + } + + public String getRollupIndex() { + return rollupIndex; + } + + public RollupActionConfig config() { + return config; + } + + @Override + public Status getStatus() { + return status; + } + + @Override + public void onCancelled() { + status.setStatus(RollupShardStatus.Status.ABORT); + } +} diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/rollup/action/RollupShardStatusSerializingTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/rollup/action/RollupShardStatusSerializingTests.java new file mode 100644 index 0000000000000..3c05f13b82158 --- /dev/null +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/rollup/action/RollupShardStatusSerializingTests.java @@ -0,0 +1,42 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.xpack.core.rollup.action; + +import org.elasticsearch.common.io.stream.Writeable.Reader; +import org.elasticsearch.index.shard.ShardId; +import org.elasticsearch.test.AbstractSerializingTestCase; +import org.elasticsearch.xcontent.XContentParser; + +import java.io.IOException; +import java.util.concurrent.atomic.AtomicLong; + +public class RollupShardStatusSerializingTests extends AbstractSerializingTestCase { + @Override + protected RollupShardStatus doParseInstance(XContentParser parser) throws IOException { + return RollupShardStatus.fromXContent(parser); + } + + @Override + protected Reader instanceReader() { + return RollupShardStatus::new; + } + + @Override + protected RollupShardStatus createTestInstance() { + RollupShardStatus rollupShardStatus = new RollupShardStatus( + new ShardId(randomAlphaOfLength(5), randomAlphaOfLength(5), randomInt(5)) + ); + rollupShardStatus.init( + new AtomicLong(randomNonNegativeLong()), + new AtomicLong(randomNonNegativeLong()), + new AtomicLong(randomNonNegativeLong()), + new AtomicLong(randomNonNegativeLong()) + ); + return rollupShardStatus; + } +} diff --git a/x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/rollup/v2/RollupShardIndexer.java b/x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/rollup/v2/RollupShardIndexer.java index 68e09e470cdce..da50e77da7d75 100644 --- a/x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/rollup/v2/RollupShardIndexer.java +++ b/x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/rollup/v2/RollupShardIndexer.java @@ -42,8 +42,11 @@ import org.elasticsearch.search.aggregations.LeafBucketCollector; import org.elasticsearch.search.aggregations.bucket.DocCountProvider; import org.elasticsearch.search.aggregations.timeseries.TimeSeriesIndexSearcher; +import org.elasticsearch.tasks.TaskCancelledException; import org.elasticsearch.xpack.core.rollup.RollupActionConfig; import org.elasticsearch.xpack.core.rollup.action.RollupIndexerAction; +import org.elasticsearch.xpack.core.rollup.action.RollupShardStatus; +import org.elasticsearch.xpack.core.rollup.action.RollupShardStatus.Status; import java.io.Closeable; import java.io.IOException; @@ -85,11 +88,15 @@ class RollupShardIndexer { private final String[] metricFields; private final List metricFieldFetchers; + private final AtomicLong numReceived = new AtomicLong(); private final AtomicLong numSent = new AtomicLong(); private final AtomicLong numIndexed = new AtomicLong(); private final AtomicLong numFailed = new AtomicLong(); + private final RollupShardStatus status; + RollupShardIndexer( + RollupShardStatus status, Client client, IndexService indexService, ShardId shardId, @@ -98,6 +105,7 @@ class RollupShardIndexer { String[] dimensionFields, String[] metricFields ) { + this.status = status; this.client = client; this.indexShard = indexService.getShard(shardId.id()); this.config = config; @@ -105,6 +113,7 @@ class RollupShardIndexer { this.dimensionFields = dimensionFields; this.metricFields = metricFields; + this.status.init(numReceived, numSent, numIndexed, numFailed); this.searcher = indexShard.acquireSearcher("rollup"); Closeable toClose = searcher; try { @@ -129,8 +138,18 @@ class RollupShardIndexer { public RollupIndexerAction.ShardRollupResponse execute() throws IOException { BulkProcessor bulkProcessor = createBulkProcessor(); try (searcher; bulkProcessor) { - // TODO: add cancellations - final TimeSeriesIndexSearcher timeSeriesSearcher = new TimeSeriesIndexSearcher(searcher, List.of()); + final TimeSeriesIndexSearcher timeSeriesSearcher = new TimeSeriesIndexSearcher(searcher, List.of(() -> { + logger.warn( + "Shard [{}] rollup abort, sent [{}], indexed [{}], failed[{}]", + indexShard.shardId(), + numSent.get(), + numIndexed.get(), + numFailed.get() + ); + if (status.getStatus() == Status.ABORT) { + throw new TaskCancelledException(format("Shard [{}] rollup abort", indexShard.shardId())); + } + })); TimeSeriesBucketCollector bucketCollector = new TimeSeriesBucketCollector(bulkProcessor); bucketCollector.preCollection(); timeSeriesSearcher.search(new MatchAllDocsQuery(), bucketCollector); @@ -138,8 +157,9 @@ public RollupIndexerAction.ShardRollupResponse execute() throws IOException { } logger.info( - "Shard {} successfully sent [{}], indexed [{}], failed [{}]", + "Shard {} successfully sent [{}], received source doc [{}], indexed rollup doc [{}], failed [{}]", indexShard.shardId(), + numReceived.get(), numSent.get(), numIndexed.get(), numFailed.get() @@ -229,6 +249,7 @@ public LeafBucketCollector getLeafCollector(final AggregationExecutionContext ag return new LeafBucketCollector() { @Override public void collect(int docId, long owningBucketOrd) throws IOException { + numReceived.incrementAndGet(); final BytesRef tsid = aggCtx.getTsid(); assert tsid != null : "Document without [" + TimeSeriesIdFieldMapper.NAME + "] field was found."; final long timestamp = aggCtx.getTimestamp(); @@ -247,18 +268,16 @@ public void collect(int docId, long owningBucketOrd) throws IOException { * - _tsid must be sorted in ascending order * - @timestamp must be sorted in descending order within the same _tsid */ - assert lastTsid == null || lastTsid.compareTo(tsid) <= 0 - : "_tsid is not sorted in ascending order: [" - + DocValueFormat.TIME_SERIES_ID.format(lastTsid) - + "] -> [" - + DocValueFormat.TIME_SERIES_ID.format(tsid) - + "]"; - assert tsid.equals(lastTsid) == false || lastTimestamp >= timestamp - : "@timestamp is not sorted in descending order: [" - + timestampFormat.format(lastTimestamp) - + "] -> [" - + timestampFormat.format(timestamp) - + "]"; + assert lastTsid == null || lastTsid.compareTo(tsid) <= 0 : "_tsid is not sorted in ascending order: [" + + DocValueFormat.TIME_SERIES_ID.format(lastTsid) + + "] -> [" + + DocValueFormat.TIME_SERIES_ID.format(tsid) + + "]"; + assert tsid.equals(lastTsid) == false || lastTimestamp >= timestamp : "@timestamp is not sorted in descending order: [" + + timestampFormat.format(lastTimestamp) + + "] -> [" + + timestampFormat.format(timestamp) + + "]"; lastTsid = BytesRef.deepCopyOf(tsid); lastTimestamp = timestamp; diff --git a/x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/rollup/v2/TransportRollupIndexerAction.java b/x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/rollup/v2/TransportRollupIndexerAction.java index 8a0729718e0a0..a463e5264b5f8 100644 --- a/x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/rollup/v2/TransportRollupIndexerAction.java +++ b/x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/rollup/v2/TransportRollupIndexerAction.java @@ -29,6 +29,7 @@ import org.elasticsearch.transport.TransportService; import org.elasticsearch.xpack.core.ClientHelper; import org.elasticsearch.xpack.core.rollup.action.RollupIndexerAction; +import org.elasticsearch.xpack.core.rollup.action.RollupShardStatus; import java.io.IOException; import java.util.Arrays; @@ -125,6 +126,7 @@ protected RollupIndexerAction.ShardRollupResponse shardOperation(RollupIndexerAc throws IOException { IndexService indexService = indicesService.indexService(request.shardId().getIndex()); RollupShardIndexer indexer = new RollupShardIndexer( + (RollupShardStatus) task.getStatus(), client, indexService, request.shardId(), diff --git a/x-pack/plugin/rollup/src/test/java/org/elasticsearch/xpack/rollup/v2/RollupActionSingleNodeTests.java b/x-pack/plugin/rollup/src/test/java/org/elasticsearch/xpack/rollup/v2/RollupActionSingleNodeTests.java index ff3201b0c23c7..37bae2838e4fa 100644 --- a/x-pack/plugin/rollup/src/test/java/org/elasticsearch/xpack/rollup/v2/RollupActionSingleNodeTests.java +++ b/x-pack/plugin/rollup/src/test/java/org/elasticsearch/xpack/rollup/v2/RollupActionSingleNodeTests.java @@ -32,15 +32,19 @@ import org.elasticsearch.common.compress.CompressedXContent; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.time.DateFormatter; +import org.elasticsearch.common.util.CancellableThreads.ExecutionCancelledException; import org.elasticsearch.datastreams.DataStreamsPlugin; import org.elasticsearch.index.Index; import org.elasticsearch.index.IndexMode; import org.elasticsearch.index.IndexNotFoundException; +import org.elasticsearch.index.IndexService; import org.elasticsearch.index.IndexSettings; import org.elasticsearch.index.engine.VersionConflictEngineException; import org.elasticsearch.index.mapper.DateFieldMapper; import org.elasticsearch.index.mapper.TimeSeriesIdFieldMapper; import org.elasticsearch.index.mapper.TimeSeriesParams; +import org.elasticsearch.index.shard.IndexShard; +import org.elasticsearch.indices.IndicesService; import org.elasticsearch.plugins.Plugin; import org.elasticsearch.search.aggregations.bucket.composite.CompositeAggregationBuilder; import org.elasticsearch.search.aggregations.bucket.composite.CompositeValuesSourceBuilder; @@ -62,6 +66,10 @@ import org.elasticsearch.xpack.core.rollup.RollupActionConfig; import org.elasticsearch.xpack.core.rollup.action.RollupAction; import org.elasticsearch.xpack.core.rollup.action.RollupActionRequestValidationException; +import org.elasticsearch.xpack.core.rollup.action.RollupShardStatus; +import org.elasticsearch.xpack.core.rollup.action.RollupShardStatus.Status; +import org.elasticsearch.xpack.core.rollup.job.MetricConfig; +import org.elasticsearch.xpack.core.rollup.job.TermsGroupConfig; import org.elasticsearch.xpack.rollup.Rollup; import org.junit.Before; @@ -70,6 +78,7 @@ import java.time.ZoneId; import java.util.ArrayList; import java.util.Collection; +import java.util.Collections; import java.util.List; import java.util.Locale; import java.util.Map; @@ -335,6 +344,38 @@ public void testRollupDatastream() throws Exception { assertFalse(indices.stream().filter(i -> i.getName().equals(sourceIndex)).toList().isEmpty()); } + public void testCancelRollupIndexer() throws IOException { + // create rollup config and index documents into source index + RollupActionConfig config = new RollupActionConfig(randomInterval()); + SourceSupplier sourceSupplier = () -> XContentFactory.jsonBuilder() + .startObject() + .field("@timestamp", randomDateForInterval(config.getInterval())) + .field("categorical_1", randomAlphaOfLength(1)) + .field("numeric_1", randomDouble()) + .endObject(); + bulkIndex(sourceSupplier); + + IndicesService indexServices = getInstanceFromNode(IndicesService.class); + Index srcIndex = resolveIndex(index); + IndexService indexService = indexServices.indexServiceSafe(srcIndex); + IndexShard shard = indexService.getShard(0); + + // re-use source index as temp index for test + RollupShardIndexer indexer = new RollupShardIndexer( + new RollupShardStatus(shard.shardId()), + client(), + indexService, + shard.shardId(), + config, + rollupIndex + ); + indexer.status.setStatus(Status.ABORT); + { + ExecutionCancelledException exception = expectThrows(ExecutionCancelledException.class, () -> indexer.execute()); + assertThat(exception.getMessage(), containsString("rollup cancelled")); + } + } + private DateHistogramInterval randomInterval() { return ConfigTestHelpers.randomInterval(); } From a39ae4e8822a537b1fdb711b178a7858bc162ffd Mon Sep 17 00:00:00 2001 From: weizijun Date: Wed, 13 Jul 2022 16:16:43 +0800 Subject: [PATCH 02/14] cancel task when bulk failed --- .../core/rollup/action/RollupShardStatus.java | 4 ++ .../core/rollup/action/RollupShardTask.java | 2 +- .../xpack/rollup/v2/RollupShardIndexer.java | 57 ++++++++++++------- .../v2/RollupActionSingleNodeTests.java | 28 ++++----- 4 files changed, 54 insertions(+), 37 deletions(-) diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/rollup/action/RollupShardStatus.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/rollup/action/RollupShardStatus.java index fdcaabc6f4a69..2975f97e50963 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/rollup/action/RollupShardStatus.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/rollup/action/RollupShardStatus.java @@ -115,6 +115,10 @@ public void setStatus(Status status) { this.status = status; } + public void setCancelStatus() { + this.status = Status.ABORT; + } + public static RollupShardStatus fromXContent(XContentParser parser) throws IOException { return PARSER.parse(parser, null); } diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/rollup/action/RollupShardTask.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/rollup/action/RollupShardTask.java index fc48b1d8a3e38..fba39c9617909 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/rollup/action/RollupShardTask.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/rollup/action/RollupShardTask.java @@ -51,6 +51,6 @@ public Status getStatus() { @Override public void onCancelled() { - status.setStatus(RollupShardStatus.Status.ABORT); + status.setCancelStatus(); } } diff --git a/x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/rollup/v2/RollupShardIndexer.java b/x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/rollup/v2/RollupShardIndexer.java index 6b5de0afab146..bdb7e066be2a8 100644 --- a/x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/rollup/v2/RollupShardIndexer.java +++ b/x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/rollup/v2/RollupShardIndexer.java @@ -139,22 +139,14 @@ public RollupIndexerAction.ShardRollupResponse execute() throws IOException { long startTime = System.currentTimeMillis(); BulkProcessor bulkProcessor = createBulkProcessor(); try (searcher; bulkProcessor) { - final TimeSeriesIndexSearcher timeSeriesSearcher = new TimeSeriesIndexSearcher(searcher, List.of(() -> { - logger.warn( - "Shard [{}] rollup abort, sent [{}], indexed [{}], failed[{}]", - indexShard.shardId(), - numSent.get(), - numIndexed.get(), - numFailed.get() - ); - if (status.getStatus() == Status.ABORT) { - throw new TaskCancelledException(format("Shard [{}] rollup abort", indexShard.shardId())); - } - })); + final TimeSeriesIndexSearcher timeSeriesSearcher = new TimeSeriesIndexSearcher(searcher, List.of(() -> { checkCancelled(); })); TimeSeriesBucketCollector bucketCollector = new TimeSeriesBucketCollector(bulkProcessor); bucketCollector.preCollection(); timeSeriesSearcher.search(new MatchAllDocsQuery(), bucketCollector); bucketCollector.postCollection(); + + // check cancel after the flush all data + checkCancelled(); } logger.info( @@ -181,6 +173,19 @@ public RollupIndexerAction.ShardRollupResponse execute() throws IOException { return new RollupIndexerAction.ShardRollupResponse(indexShard.shardId(), numIndexed.get()); } + private void checkCancelled() { + if (status.getStatus() == Status.ABORT) { + logger.warn( + "Shard [{}] rollup abort, sent [{}], indexed [{}], failed[{}]", + indexShard.shardId(), + numSent.get(), + numIndexed.get(), + numFailed.get() + ); + throw new TaskCancelledException(format("Shard %s rollup cancelled", indexShard.shardId())); + } + } + private BulkProcessor createBulkProcessor() { final BulkProcessor.Listener listener = new BulkProcessor.Listener() { @Override @@ -203,6 +208,9 @@ public void afterBulk(long executionId, BulkRequest request, BulkResponse respon ); numFailed.addAndGet(failures.size()); logger.error("Shard [{}] failed to populate rollup index. Failures: [{}]", indexShard.shardId(), failures); + + // cancel rollup task + status.setCancelStatus(); } } @@ -212,6 +220,9 @@ public void afterBulk(long executionId, BulkRequest request, Throwable failure) long items = request.numberOfActions(); numFailed.addAndGet(items); logger.error(() -> format("Shard [%s] failed to populate rollup index.", indexShard.shardId()), failure); + + // cancel rollup task + status.setCancelStatus(); } } }; @@ -277,16 +288,18 @@ public void collect(int docId, long owningBucketOrd) throws IOException { * - _tsid must be sorted in ascending order * - @timestamp must be sorted in descending order within the same _tsid */ - assert lastTsid == null || lastTsid.compareTo(tsid) <= 0 : "_tsid is not sorted in ascending order: [" - + DocValueFormat.TIME_SERIES_ID.format(lastTsid) - + "] -> [" - + DocValueFormat.TIME_SERIES_ID.format(tsid) - + "]"; - assert tsid.equals(lastTsid) == false || lastTimestamp >= timestamp : "@timestamp is not sorted in descending order: [" - + timestampFormat.format(lastTimestamp) - + "] -> [" - + timestampFormat.format(timestamp) - + "]"; + assert lastTsid == null || lastTsid.compareTo(tsid) <= 0 + : "_tsid is not sorted in ascending order: [" + + DocValueFormat.TIME_SERIES_ID.format(lastTsid) + + "] -> [" + + DocValueFormat.TIME_SERIES_ID.format(tsid) + + "]"; + assert tsid.equals(lastTsid) == false || lastTimestamp >= timestamp + : "@timestamp is not sorted in descending order: [" + + timestampFormat.format(lastTimestamp) + + "] -> [" + + timestampFormat.format(timestamp) + + "]"; lastTsid = BytesRef.deepCopyOf(tsid); lastTimestamp = timestamp; diff --git a/x-pack/plugin/rollup/src/test/java/org/elasticsearch/xpack/rollup/v2/RollupActionSingleNodeTests.java b/x-pack/plugin/rollup/src/test/java/org/elasticsearch/xpack/rollup/v2/RollupActionSingleNodeTests.java index 37bae2838e4fa..be37c51f619ac 100644 --- a/x-pack/plugin/rollup/src/test/java/org/elasticsearch/xpack/rollup/v2/RollupActionSingleNodeTests.java +++ b/x-pack/plugin/rollup/src/test/java/org/elasticsearch/xpack/rollup/v2/RollupActionSingleNodeTests.java @@ -32,7 +32,6 @@ import org.elasticsearch.common.compress.CompressedXContent; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.time.DateFormatter; -import org.elasticsearch.common.util.CancellableThreads.ExecutionCancelledException; import org.elasticsearch.datastreams.DataStreamsPlugin; import org.elasticsearch.index.Index; import org.elasticsearch.index.IndexMode; @@ -56,6 +55,7 @@ import org.elasticsearch.search.aggregations.metrics.MinAggregationBuilder; import org.elasticsearch.search.aggregations.metrics.SumAggregationBuilder; import org.elasticsearch.search.aggregations.metrics.ValueCountAggregationBuilder; +import org.elasticsearch.tasks.TaskCancelledException; import org.elasticsearch.test.ESSingleNodeTestCase; import org.elasticsearch.xcontent.XContentBuilder; import org.elasticsearch.xcontent.XContentFactory; @@ -67,9 +67,6 @@ import org.elasticsearch.xpack.core.rollup.action.RollupAction; import org.elasticsearch.xpack.core.rollup.action.RollupActionRequestValidationException; import org.elasticsearch.xpack.core.rollup.action.RollupShardStatus; -import org.elasticsearch.xpack.core.rollup.action.RollupShardStatus.Status; -import org.elasticsearch.xpack.core.rollup.job.MetricConfig; -import org.elasticsearch.xpack.core.rollup.job.TermsGroupConfig; import org.elasticsearch.xpack.rollup.Rollup; import org.junit.Before; @@ -78,7 +75,6 @@ import java.time.ZoneId; import java.util.ArrayList; import java.util.Collection; -import java.util.Collections; import java.util.List; import java.util.Locale; import java.util.Map; @@ -349,30 +345,34 @@ public void testCancelRollupIndexer() throws IOException { RollupActionConfig config = new RollupActionConfig(randomInterval()); SourceSupplier sourceSupplier = () -> XContentFactory.jsonBuilder() .startObject() - .field("@timestamp", randomDateForInterval(config.getInterval())) - .field("categorical_1", randomAlphaOfLength(1)) - .field("numeric_1", randomDouble()) + .field(FIELD_TIMESTAMP, randomDateForInterval(config.getInterval())) + .field(FIELD_DIMENSION_1, randomAlphaOfLength(1)) + .field(FIELD_NUMERIC_1, randomDouble()) .endObject(); bulkIndex(sourceSupplier); + prepareSourceIndex(sourceIndex); IndicesService indexServices = getInstanceFromNode(IndicesService.class); - Index srcIndex = resolveIndex(index); + Index srcIndex = resolveIndex(sourceIndex); IndexService indexService = indexServices.indexServiceSafe(srcIndex); IndexShard shard = indexService.getShard(0); + RollupShardStatus status = new RollupShardStatus(shard.shardId()); // re-use source index as temp index for test RollupShardIndexer indexer = new RollupShardIndexer( - new RollupShardStatus(shard.shardId()), + status, client(), indexService, shard.shardId(), + rollupIndex, config, - rollupIndex + new String[] { FIELD_DIMENSION_1, FIELD_DIMENSION_2 }, + new String[] { FIELD_NUMERIC_1, FIELD_NUMERIC_2 } ); - indexer.status.setStatus(Status.ABORT); + status.setCancelStatus(); { - ExecutionCancelledException exception = expectThrows(ExecutionCancelledException.class, () -> indexer.execute()); - assertThat(exception.getMessage(), containsString("rollup cancelled")); + TaskCancelledException exception = expectThrows(TaskCancelledException.class, () -> indexer.execute()); + assertThat(exception.getMessage(), containsString("Shard [" + sourceIndex + "][0] rollup cancelled")); } } From fb3d7ab3e2c7e558010ea934a3a6dace4d5c37b3 Mon Sep 17 00:00:00 2001 From: weizijun Date: Wed, 13 Jul 2022 17:12:07 +0800 Subject: [PATCH 03/14] add rollup bulk failed test --- .../xpack/rollup/v2/RollupShardIndexer.java | 9 ++++-- .../v2/RollupActionSingleNodeTests.java | 32 ++++++++++++++++--- 2 files changed, 34 insertions(+), 7 deletions(-) diff --git a/x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/rollup/v2/RollupShardIndexer.java b/x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/rollup/v2/RollupShardIndexer.java index bdb7e066be2a8..3930b8d92481e 100644 --- a/x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/rollup/v2/RollupShardIndexer.java +++ b/x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/rollup/v2/RollupShardIndexer.java @@ -50,6 +50,7 @@ import java.io.Closeable; import java.io.IOException; +import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; import java.util.HashMap; @@ -197,8 +198,11 @@ public void beforeBulk(long executionId, BulkRequest request) { public void afterBulk(long executionId, BulkRequest request, BulkResponse response) { numIndexed.addAndGet(request.numberOfActions()); if (response.hasFailures()) { - Map failures = Arrays.stream(response.getItems()) - .filter(BulkItemResponse::isFailed) + List failedItems = Arrays.stream(response.getItems()) + .filter(BulkItemResponse::isFailed).collect(Collectors.toList()); + numFailed.addAndGet(failedItems.size()); + + Map failures = failedItems.stream() .collect( Collectors.toMap( BulkItemResponse::getId, @@ -206,7 +210,6 @@ public void afterBulk(long executionId, BulkRequest request, BulkResponse respon (msg1, msg2) -> Objects.equals(msg1, msg2) ? msg1 : msg1 + "," + msg2 ) ); - numFailed.addAndGet(failures.size()); logger.error("Shard [{}] failed to populate rollup index. Failures: [{}]", indexShard.shardId(), failures); // cancel rollup task diff --git a/x-pack/plugin/rollup/src/test/java/org/elasticsearch/xpack/rollup/v2/RollupActionSingleNodeTests.java b/x-pack/plugin/rollup/src/test/java/org/elasticsearch/xpack/rollup/v2/RollupActionSingleNodeTests.java index be37c51f619ac..4a15593ba3b05 100644 --- a/x-pack/plugin/rollup/src/test/java/org/elasticsearch/xpack/rollup/v2/RollupActionSingleNodeTests.java +++ b/x-pack/plugin/rollup/src/test/java/org/elasticsearch/xpack/rollup/v2/RollupActionSingleNodeTests.java @@ -81,6 +81,7 @@ import java.util.concurrent.ExecutionException; import java.util.stream.Collectors; +import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertHitCount; import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.equalTo; @@ -370,10 +371,33 @@ public void testCancelRollupIndexer() throws IOException { new String[] { FIELD_NUMERIC_1, FIELD_NUMERIC_2 } ); status.setCancelStatus(); - { - TaskCancelledException exception = expectThrows(TaskCancelledException.class, () -> indexer.execute()); - assertThat(exception.getMessage(), containsString("Shard [" + sourceIndex + "][0] rollup cancelled")); - } + + TaskCancelledException exception = expectThrows(TaskCancelledException.class, () -> indexer.execute()); + assertThat(exception.getMessage(), equalTo("Shard [" + sourceIndex + "][0] rollup cancelled")); + } + + public void testRollupBulkFailed() throws IOException { + // create rollup config and index documents into source index + RollupActionConfig config = new RollupActionConfig(randomInterval()); + SourceSupplier sourceSupplier = () -> XContentFactory.jsonBuilder() + .startObject() + .field(FIELD_TIMESTAMP, randomDateForInterval(config.getInterval())) + .field(FIELD_DIMENSION_1, randomAlphaOfLength(1)) + .field(FIELD_NUMERIC_1, randomDouble()) + .endObject(); + bulkIndex(sourceSupplier); + prepareSourceIndex(sourceIndex); + + // block rollup index + assertAcked(client().admin() + .indices() + .preparePutTemplate(rollupIndex) + .setPatterns(List.of(rollupIndex)) + .setSettings(Settings.builder().put("index.blocks.write", "true").build()) + .get()); + + ElasticsearchException exception = expectThrows(ElasticsearchException.class, () -> rollup(sourceIndex, rollupIndex, config)); + assertThat(exception.getMessage(), equalTo("Unable to rollup index [" + sourceIndex + "]")); } private DateHistogramInterval randomInterval() { From ab8ca0617d2d2d908537b24f4717b2df514fe13c Mon Sep 17 00:00:00 2001 From: weizijun Date: Thu, 14 Jul 2022 16:39:08 +0800 Subject: [PATCH 04/14] when write rollup index docs failed, cancel the rollup task --- .../broadcast/TransportBroadcastAction.java | 2 +- .../core/rollup/action/RollupShardStatus.java | 6 ++- .../core/rollup/action/RollupShardTask.java | 2 +- .../xpack/rollup/v2/RollupShardIndexer.java | 21 +++++----- .../v2/TransportRollupIndexerAction.java | 38 ++++++++++++++++++- .../v2/RollupActionSingleNodeTests.java | 18 +++++---- 6 files changed, 65 insertions(+), 22 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/action/support/broadcast/TransportBroadcastAction.java b/server/src/main/java/org/elasticsearch/action/support/broadcast/TransportBroadcastAction.java index 236bc1ea103b9..7985e940e186d 100644 --- a/server/src/main/java/org/elasticsearch/action/support/broadcast/TransportBroadcastAction.java +++ b/server/src/main/java/org/elasticsearch/action/support/broadcast/TransportBroadcastAction.java @@ -211,7 +211,7 @@ protected void onOperation(ShardRouting shard, int shardIndex, ShardResponse res } } - void onOperation(@Nullable ShardRouting shard, final ShardIterator shardIt, int shardIndex, Exception e) { + protected void onOperation(@Nullable ShardRouting shard, final ShardIterator shardIt, int shardIndex, Exception e) { // we set the shard failure always, even if its the first in the replication group, and the next one // will work (it will just override it...) setFailure(shardIt, shardIndex, e); diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/rollup/action/RollupShardStatus.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/rollup/action/RollupShardStatus.java index 2975f97e50963..3ae17b39894cc 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/rollup/action/RollupShardStatus.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/rollup/action/RollupShardStatus.java @@ -115,7 +115,11 @@ public void setStatus(Status status) { this.status = status; } - public void setCancelStatus() { + public boolean isCancelled() { + return this.status == Status.ABORT; + } + + public void setCancelled() { this.status = Status.ABORT; } diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/rollup/action/RollupShardTask.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/rollup/action/RollupShardTask.java index fba39c9617909..cd664248f9a65 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/rollup/action/RollupShardTask.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/rollup/action/RollupShardTask.java @@ -51,6 +51,6 @@ public Status getStatus() { @Override public void onCancelled() { - status.setCancelStatus(); + status.setCancelled(); } } diff --git a/x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/rollup/v2/RollupShardIndexer.java b/x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/rollup/v2/RollupShardIndexer.java index 3930b8d92481e..dd7e3ba824603 100644 --- a/x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/rollup/v2/RollupShardIndexer.java +++ b/x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/rollup/v2/RollupShardIndexer.java @@ -46,11 +46,9 @@ import org.elasticsearch.xpack.core.rollup.RollupActionConfig; import org.elasticsearch.xpack.core.rollup.action.RollupIndexerAction; import org.elasticsearch.xpack.core.rollup.action.RollupShardStatus; -import org.elasticsearch.xpack.core.rollup.action.RollupShardStatus.Status; import java.io.Closeable; import java.io.IOException; -import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; import java.util.HashMap; @@ -145,9 +143,6 @@ public RollupIndexerAction.ShardRollupResponse execute() throws IOException { bucketCollector.preCollection(); timeSeriesSearcher.search(new MatchAllDocsQuery(), bucketCollector); bucketCollector.postCollection(); - - // check cancel after the flush all data - checkCancelled(); } logger.info( @@ -175,7 +170,7 @@ public RollupIndexerAction.ShardRollupResponse execute() throws IOException { } private void checkCancelled() { - if (status.getStatus() == Status.ABORT) { + if (status.isCancelled()) { logger.warn( "Shard [{}] rollup abort, sent [{}], indexed [{}], failed[{}]", indexShard.shardId(), @@ -199,7 +194,8 @@ public void afterBulk(long executionId, BulkRequest request, BulkResponse respon numIndexed.addAndGet(request.numberOfActions()); if (response.hasFailures()) { List failedItems = Arrays.stream(response.getItems()) - .filter(BulkItemResponse::isFailed).collect(Collectors.toList()); + .filter(BulkItemResponse::isFailed) + .collect(Collectors.toList()); numFailed.addAndGet(failedItems.size()); Map failures = failedItems.stream() @@ -213,7 +209,7 @@ public void afterBulk(long executionId, BulkRequest request, BulkResponse respon logger.error("Shard [{}] failed to populate rollup index. Failures: [{}]", indexShard.shardId(), failures); // cancel rollup task - status.setCancelStatus(); + status.setCancelled(); } } @@ -225,7 +221,7 @@ public void afterBulk(long executionId, BulkRequest request, Throwable failure) logger.error(() -> format("Shard [%s] failed to populate rollup index.", indexShard.shardId()), failure); // cancel rollup task - status.setCancelStatus(); + status.setCancelled(); } } }; @@ -353,7 +349,8 @@ private void indexBucket(Map doc) { @Override public void preCollection() throws IOException { - // no-op + // check cancel when start running + checkCancelled(); } @Override @@ -364,6 +361,10 @@ public void postCollection() throws IOException { indexBucket(doc); } bulkProcessor.flush(); + + // check cancel after the flush all data + checkCancelled(); + logger.info("Shard {} processed [{}] docs, created [{}] rollup buckets", indexShard.shardId(), docsProcessed, bucketsCreated); } diff --git a/x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/rollup/v2/TransportRollupIndexerAction.java b/x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/rollup/v2/TransportRollupIndexerAction.java index a463e5264b5f8..d95cc12944492 100644 --- a/x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/rollup/v2/TransportRollupIndexerAction.java +++ b/x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/rollup/v2/TransportRollupIndexerAction.java @@ -9,6 +9,8 @@ import org.elasticsearch.ElasticsearchException; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.NoShardAvailableActionException; +import org.elasticsearch.action.admin.cluster.node.tasks.cancel.CancelTasksRequest; +import org.elasticsearch.action.admin.cluster.node.tasks.cancel.TransportCancelTasksAction; import org.elasticsearch.action.support.ActionFilters; import org.elasticsearch.action.support.broadcast.TransportBroadcastAction; import org.elasticsearch.client.internal.Client; @@ -23,9 +25,11 @@ import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.core.Nullable; import org.elasticsearch.index.IndexService; import org.elasticsearch.indices.IndicesService; import org.elasticsearch.tasks.Task; +import org.elasticsearch.tasks.TaskId; import org.elasticsearch.transport.TransportService; import org.elasticsearch.xpack.core.ClientHelper; import org.elasticsearch.xpack.core.rollup.action.RollupIndexerAction; @@ -51,6 +55,7 @@ public class TransportRollupIndexerAction extends TransportBroadcastAction< private final Client client; private final ClusterService clusterService; private final IndicesService indicesService; + private final TransportCancelTasksAction cancelTasksAction; @Inject public TransportRollupIndexerAction( @@ -59,7 +64,8 @@ public TransportRollupIndexerAction( TransportService transportService, IndicesService indicesService, ActionFilters actionFilters, - IndexNameExpressionResolver indexNameExpressionResolver + IndexNameExpressionResolver indexNameExpressionResolver, + TransportCancelTasksAction cancelTasksAction ) { super( RollupIndexerAction.NAME, @@ -74,6 +80,7 @@ public TransportRollupIndexerAction( this.client = new OriginSettingClient(client, ClientHelper.ROLLUP_ORIGIN); this.clusterService = clusterService; this.indicesService = indicesService; + this.cancelTasksAction = cancelTasksAction; } @Override @@ -171,9 +178,12 @@ protected RollupIndexerAction.Response newResponse( private class Async extends AsyncBroadcastAction { private final RollupIndexerAction.Request request; private final ActionListener listener; + private final Task task; + private boolean hasCancelled = false; protected Async(Task task, RollupIndexerAction.Request request, ActionListener listener) { super(task, request, listener); + this.task = task; this.request = request; this.listener = listener; } @@ -187,5 +197,31 @@ protected void finishHim() { listener.onFailure(e); } } + + @Override + protected void onOperation(@Nullable ShardRouting shard, final ShardIterator shardIt, int shardIndex, Exception e) { + cancelOtherShardIndexers(); + super.onOperation(shard, shardIt, shardIndex, e); + } + + private void cancelOtherShardIndexers() { + if (false == hasCancelled) { + cancelTasksAction.execute( + task, + new CancelTasksRequest().setTargetParentTaskId(new TaskId(clusterService.localNode().getId(), task.getId())), + ActionListener.wrap(r -> { + logger.info("[{}] rollup cancel other shard indexers", request.getRollupRequest().getSourceIndex()); + hasCancelled = true; + }, + e -> { + logger.warn( + () -> "[" + request.getRollupRequest().getSourceIndex() + "] rollup cancel other shard indexers failed", + e + ); + } + ) + ); + } + } } } diff --git a/x-pack/plugin/rollup/src/test/java/org/elasticsearch/xpack/rollup/v2/RollupActionSingleNodeTests.java b/x-pack/plugin/rollup/src/test/java/org/elasticsearch/xpack/rollup/v2/RollupActionSingleNodeTests.java index 4a15593ba3b05..8301feee5a238 100644 --- a/x-pack/plugin/rollup/src/test/java/org/elasticsearch/xpack/rollup/v2/RollupActionSingleNodeTests.java +++ b/x-pack/plugin/rollup/src/test/java/org/elasticsearch/xpack/rollup/v2/RollupActionSingleNodeTests.java @@ -120,7 +120,7 @@ public void setup() { rollupIndex = "rollup-" + sourceIndex; startTime = randomLongBetween(946769284000L, 1607470084000L); // random date between 2000-2020 docCount = randomIntBetween(10, 9000); - numOfShards = randomIntBetween(1, 4); + numOfShards = 4;// randomIntBetween(1, 4); numOfReplicas = 0; // Since this is a single node, we cannot have replicas // Values for keyword dimensions @@ -370,7 +370,7 @@ public void testCancelRollupIndexer() throws IOException { new String[] { FIELD_DIMENSION_1, FIELD_DIMENSION_2 }, new String[] { FIELD_NUMERIC_1, FIELD_NUMERIC_2 } ); - status.setCancelStatus(); + status.setCancelled(); TaskCancelledException exception = expectThrows(TaskCancelledException.class, () -> indexer.execute()); assertThat(exception.getMessage(), equalTo("Shard [" + sourceIndex + "][0] rollup cancelled")); @@ -389,12 +389,14 @@ public void testRollupBulkFailed() throws IOException { prepareSourceIndex(sourceIndex); // block rollup index - assertAcked(client().admin() - .indices() - .preparePutTemplate(rollupIndex) - .setPatterns(List.of(rollupIndex)) - .setSettings(Settings.builder().put("index.blocks.write", "true").build()) - .get()); + assertAcked( + client().admin() + .indices() + .preparePutTemplate(rollupIndex) + .setPatterns(List.of(rollupIndex)) + .setSettings(Settings.builder().put("index.blocks.write", "true").build()) + .get() + ); ElasticsearchException exception = expectThrows(ElasticsearchException.class, () -> rollup(sourceIndex, rollupIndex, config)); assertThat(exception.getMessage(), equalTo("Unable to rollup index [" + sourceIndex + "]")); From b7dfa21216976fe67cfa01d6950ec1775d0bf15a Mon Sep 17 00:00:00 2001 From: weizijun Date: Thu, 14 Jul 2022 16:42:06 +0800 Subject: [PATCH 05/14] support task cancel --- .../xpack/core/rollup/action/RollupShardStatus.java | 8 ++------ .../elasticsearch/xpack/rollup/v2/RollupShardIndexer.java | 2 ++ 2 files changed, 4 insertions(+), 6 deletions(-) diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/rollup/action/RollupShardStatus.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/rollup/action/RollupShardStatus.java index 3ae17b39894cc..727dd9e1ba72c 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/rollup/action/RollupShardStatus.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/rollup/action/RollupShardStatus.java @@ -107,12 +107,8 @@ public void init(AtomicLong numReceived, AtomicLong numSent, AtomicLong numIndex this.numFailed = numFailed; } - public Status getStatus() { - return status; - } - - public void setStatus(Status status) { - this.status = status; + public void setFinished() { + this.status = Status.FINISHED; } public boolean isCancelled() { diff --git a/x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/rollup/v2/RollupShardIndexer.java b/x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/rollup/v2/RollupShardIndexer.java index dd7e3ba824603..eafe931be8375 100644 --- a/x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/rollup/v2/RollupShardIndexer.java +++ b/x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/rollup/v2/RollupShardIndexer.java @@ -166,6 +166,8 @@ public RollupIndexerAction.ShardRollupResponse execute() throws IOException { + "]." ); } + + status.setFinished(); return new RollupIndexerAction.ShardRollupResponse(indexShard.shardId(), numIndexed.get()); } From d15cb1c5db156c4daefffecd360231968174d7f2 Mon Sep 17 00:00:00 2001 From: weizijun Date: Fri, 22 Jul 2022 11:09:00 +0800 Subject: [PATCH 06/14] improve --- .../xpack/core/rollup/action/RollupShardTask.java | 4 ++-- .../xpack/rollup/v2/RollupActionSingleNodeTests.java | 7 ++++--- 2 files changed, 6 insertions(+), 5 deletions(-) diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/rollup/action/RollupShardTask.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/rollup/action/RollupShardTask.java index cd664248f9a65..522c6e344e03e 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/rollup/action/RollupShardTask.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/rollup/action/RollupShardTask.java @@ -7,14 +7,14 @@ package org.elasticsearch.xpack.core.rollup.action; -import java.util.Map; - import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.tasks.CancellableTask; import org.elasticsearch.tasks.TaskId; import org.elasticsearch.xpack.core.rollup.RollupActionConfig; import org.elasticsearch.xpack.core.rollup.RollupField; +import java.util.Map; + public class RollupShardTask extends CancellableTask { private String rollupIndex; private RollupActionConfig config; diff --git a/x-pack/plugin/rollup/src/test/java/org/elasticsearch/xpack/rollup/v2/RollupActionSingleNodeTests.java b/x-pack/plugin/rollup/src/test/java/org/elasticsearch/xpack/rollup/v2/RollupActionSingleNodeTests.java index 8301feee5a238..b5c1bd2274638 100644 --- a/x-pack/plugin/rollup/src/test/java/org/elasticsearch/xpack/rollup/v2/RollupActionSingleNodeTests.java +++ b/x-pack/plugin/rollup/src/test/java/org/elasticsearch/xpack/rollup/v2/RollupActionSingleNodeTests.java @@ -120,7 +120,7 @@ public void setup() { rollupIndex = "rollup-" + sourceIndex; startTime = randomLongBetween(946769284000L, 1607470084000L); // random date between 2000-2020 docCount = randomIntBetween(10, 9000); - numOfShards = 4;// randomIntBetween(1, 4); + numOfShards = randomIntBetween(1, 4); numOfReplicas = 0; // Since this is a single node, we cannot have replicas // Values for keyword dimensions @@ -356,7 +356,8 @@ public void testCancelRollupIndexer() throws IOException { IndicesService indexServices = getInstanceFromNode(IndicesService.class); Index srcIndex = resolveIndex(sourceIndex); IndexService indexService = indexServices.indexServiceSafe(srcIndex); - IndexShard shard = indexService.getShard(0); + int shardNum = randomIntBetween(0, numOfShards - 1); + IndexShard shard = indexService.getShard(shardNum); RollupShardStatus status = new RollupShardStatus(shard.shardId()); // re-use source index as temp index for test @@ -373,7 +374,7 @@ public void testCancelRollupIndexer() throws IOException { status.setCancelled(); TaskCancelledException exception = expectThrows(TaskCancelledException.class, () -> indexer.execute()); - assertThat(exception.getMessage(), equalTo("Shard [" + sourceIndex + "][0] rollup cancelled")); + assertThat(exception.getMessage(), equalTo("Shard [" + sourceIndex + "][" + shardNum + "] rollup cancelled")); } public void testRollupBulkFailed() throws IOException { From 1d28c3bc0df10dc7aa29ff10507bf4259ce04253 Mon Sep 17 00:00:00 2001 From: weizijun Date: Thu, 3 Nov 2022 14:34:09 +0800 Subject: [PATCH 07/14] fixup --- .../core/downsample/RollupIndexerAction.java | 5 +++-- .../xpack/core/rollup/action/RollupShardTask.java | 8 ++++---- .../action/RollupShardStatusSerializingTests.java | 4 ++-- .../xpack/downsample/RollupShardIndexer.java | 1 + .../DownsampleActionSingleNodeTests.java | 15 +++++++-------- 5 files changed, 17 insertions(+), 16 deletions(-) diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/downsample/RollupIndexerAction.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/downsample/RollupIndexerAction.java index bc833b74ae2f4..166d681c5230c 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/downsample/RollupIndexerAction.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/downsample/RollupIndexerAction.java @@ -24,6 +24,7 @@ import org.elasticsearch.tasks.TaskId; import org.elasticsearch.xcontent.ToXContentObject; import org.elasticsearch.xcontent.XContentBuilder; +import org.elasticsearch.xpack.core.rollup.action.RollupShardTask; import java.io.IOException; import java.util.Arrays; @@ -266,8 +267,8 @@ public Task createTask(long id, String type, String action, TaskId parentTaskId, type, action, parentTaskId, - request.rollupRequest.getRollupIndex(), - request.rollupRequest.getRollupConfig(), + request.rollupRequest.getSourceIndex(), + request.rollupRequest.getDownsampleConfig(), headers, shardId() ); diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/rollup/action/RollupShardTask.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/rollup/action/RollupShardTask.java index 522c6e344e03e..b099e71223a97 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/rollup/action/RollupShardTask.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/rollup/action/RollupShardTask.java @@ -10,14 +10,14 @@ import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.tasks.CancellableTask; import org.elasticsearch.tasks.TaskId; -import org.elasticsearch.xpack.core.rollup.RollupActionConfig; +import org.elasticsearch.xpack.core.downsample.DownsampleConfig; import org.elasticsearch.xpack.core.rollup.RollupField; import java.util.Map; public class RollupShardTask extends CancellableTask { private String rollupIndex; - private RollupActionConfig config; + private DownsampleConfig config; private volatile RollupShardStatus status; public RollupShardTask( @@ -26,7 +26,7 @@ public RollupShardTask( String action, TaskId parentTask, String rollupIndex, - RollupActionConfig config, + DownsampleConfig config, Map headers, ShardId shardId ) { @@ -40,7 +40,7 @@ public String getRollupIndex() { return rollupIndex; } - public RollupActionConfig config() { + public DownsampleConfig config() { return config; } diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/rollup/action/RollupShardStatusSerializingTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/rollup/action/RollupShardStatusSerializingTests.java index 3c05f13b82158..aa35ab324d15d 100644 --- a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/rollup/action/RollupShardStatusSerializingTests.java +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/rollup/action/RollupShardStatusSerializingTests.java @@ -9,13 +9,13 @@ import org.elasticsearch.common.io.stream.Writeable.Reader; import org.elasticsearch.index.shard.ShardId; -import org.elasticsearch.test.AbstractSerializingTestCase; +import org.elasticsearch.test.AbstractXContentSerializingTestCase; import org.elasticsearch.xcontent.XContentParser; import java.io.IOException; import java.util.concurrent.atomic.AtomicLong; -public class RollupShardStatusSerializingTests extends AbstractSerializingTestCase { +public class RollupShardStatusSerializingTests extends AbstractXContentSerializingTestCase { @Override protected RollupShardStatus doParseInstance(XContentParser parser) throws IOException { return RollupShardStatus.fromXContent(parser); diff --git a/x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/downsample/RollupShardIndexer.java b/x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/downsample/RollupShardIndexer.java index a76a5c5407122..9297d96d2456c 100644 --- a/x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/downsample/RollupShardIndexer.java +++ b/x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/downsample/RollupShardIndexer.java @@ -49,6 +49,7 @@ import org.elasticsearch.xcontent.XContentType; import org.elasticsearch.xpack.core.downsample.DownsampleConfig; import org.elasticsearch.xpack.core.downsample.RollupIndexerAction; +import org.elasticsearch.xpack.core.rollup.action.RollupShardStatus; import java.io.Closeable; import java.io.IOException; diff --git a/x-pack/plugin/rollup/src/test/java/org/elasticsearch/xpack/downsample/DownsampleActionSingleNodeTests.java b/x-pack/plugin/rollup/src/test/java/org/elasticsearch/xpack/downsample/DownsampleActionSingleNodeTests.java index 6b048dc2cf5e6..d973f2236a637 100644 --- a/x-pack/plugin/rollup/src/test/java/org/elasticsearch/xpack/downsample/DownsampleActionSingleNodeTests.java +++ b/x-pack/plugin/rollup/src/test/java/org/elasticsearch/xpack/downsample/DownsampleActionSingleNodeTests.java @@ -78,10 +78,9 @@ import org.elasticsearch.xpack.core.ilm.LifecycleSettings; import org.elasticsearch.xpack.core.ilm.RolloverAction; import org.elasticsearch.xpack.core.rollup.ConfigTestHelpers; +import org.elasticsearch.xpack.core.rollup.action.RollupShardStatus; import org.elasticsearch.xpack.ilm.IndexLifecycle; import org.elasticsearch.xpack.rollup.Rollup; -import org.elasticsearch.xpack.rollup.v2.RollupActionSingleNodeTests; -import org.elasticsearch.xpack.rollup.v2.RollupActionSingleNodeTests.SourceSupplier; import org.junit.Before; import java.io.IOException; @@ -579,8 +578,8 @@ public void testRollupDatastream() throws Exception { public void testCancelRollupIndexer() throws IOException { // create rollup config and index documents into source index - RollupActionConfig config = new RollupActionConfig(randomInterval()); - RollupActionSingleNodeTests.SourceSupplier sourceSupplier = () -> XContentFactory.jsonBuilder() + DownsampleConfig config = new DownsampleConfig(randomInterval()); + SourceSupplier sourceSupplier = () -> XContentFactory.jsonBuilder() .startObject() .field(FIELD_TIMESTAMP, randomDateForInterval(config.getInterval())) .field(FIELD_DIMENSION_1, randomAlphaOfLength(1)) @@ -605,7 +604,8 @@ public void testCancelRollupIndexer() throws IOException { rollupIndex, config, new String[] { FIELD_DIMENSION_1, FIELD_DIMENSION_2 }, - new String[] { FIELD_NUMERIC_1, FIELD_NUMERIC_2 } + new String[] { FIELD_NUMERIC_1, FIELD_NUMERIC_2 }, + new String[] {} ); status.setCancelled(); @@ -615,8 +615,8 @@ public void testCancelRollupIndexer() throws IOException { public void testRollupBulkFailed() throws IOException { // create rollup config and index documents into source index - RollupActionConfig config = new RollupActionConfig(randomInterval()); - RollupActionSingleNodeTests.SourceSupplier sourceSupplier = () -> XContentFactory.jsonBuilder() + DownsampleConfig config = new DownsampleConfig(randomInterval()); + SourceSupplier sourceSupplier = () -> XContentFactory.jsonBuilder() .startObject() .field(FIELD_TIMESTAMP, randomDateForInterval(config.getInterval())) .field(FIELD_DIMENSION_1, randomAlphaOfLength(1)) @@ -639,7 +639,6 @@ public void testRollupBulkFailed() throws IOException { assertThat(exception.getMessage(), equalTo("Unable to rollup index [" + sourceIndex + "]")); } - private DateHistogramInterval randomInterval() { return ConfigTestHelpers.randomInterval(); } From 558bf0a68ebb3e17660108a4175b3500d16b8227 Mon Sep 17 00:00:00 2001 From: weizijun Date: Mon, 7 Nov 2022 16:20:04 +0800 Subject: [PATCH 08/14] improve --- .../core/rollup/action/RollupShardStatus.java | 55 ++----------------- .../core/rollup/action/RollupShardTask.java | 26 +++++++-- .../RollupShardStatusSerializingTests.java | 4 +- .../xpack/downsample/RollupShardIndexer.java | 30 +++++----- .../TransportRollupIndexerAction.java | 5 +- .../DownsampleActionSingleNodeTests.java | 20 +++++-- 6 files changed, 62 insertions(+), 78 deletions(-) diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/rollup/action/RollupShardStatus.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/rollup/action/RollupShardStatus.java index 727dd9e1ba72c..b282cbef3947d 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/rollup/action/RollupShardStatus.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/rollup/action/RollupShardStatus.java @@ -27,7 +27,6 @@ public class RollupShardStatus implements Task.Status { public static final String NAME = "rollup-index-shard"; private static final ParseField SHARD_FIELD = new ParseField("shard"); - private static final ParseField STATUS_FIELD = new ParseField("status"); private static final ParseField START_TIME_FIELD = new ParseField("start_time"); private static final ParseField IN_NUM_DOCS_RECEIVED_FIELD = new ParseField("in_num_docs_received"); private static final ParseField OUT_NUM_DOCS_SENT_FIELD = new ParseField("out_num_docs_sent"); @@ -36,11 +35,10 @@ public class RollupShardStatus implements Task.Status { private final ShardId shardId; private final long rollupStart; - private Status status; - private AtomicLong numReceived = new AtomicLong(0); - private AtomicLong numSent = new AtomicLong(0); - private AtomicLong numIndexed = new AtomicLong(0); - private AtomicLong numFailed = new AtomicLong(0); + private final AtomicLong numReceived; + private final AtomicLong numSent; + private final AtomicLong numIndexed; + private final AtomicLong numFailed; private static final ConstructingObjectParser PARSER; static { @@ -48,7 +46,6 @@ public class RollupShardStatus implements Task.Status { NAME, args -> new RollupShardStatus( ShardId.fromString((String) args[0]), - Status.valueOf((String) args[1]), Instant.parse((String) args[2]).toEpochMilli(), new AtomicLong((Long) args[3]), new AtomicLong((Long) args[4]), @@ -58,7 +55,6 @@ public class RollupShardStatus implements Task.Status { ); PARSER.declareString(constructorArg(), SHARD_FIELD); - PARSER.declareString(constructorArg(), STATUS_FIELD); PARSER.declareString(constructorArg(), START_TIME_FIELD); PARSER.declareLong(constructorArg(), IN_NUM_DOCS_RECEIVED_FIELD); PARSER.declareLong(constructorArg(), OUT_NUM_DOCS_SENT_FIELD); @@ -68,7 +64,6 @@ public class RollupShardStatus implements Task.Status { public RollupShardStatus(StreamInput in) throws IOException { shardId = new ShardId(in); - status = in.readEnum(Status.class); rollupStart = in.readLong(); numReceived = new AtomicLong(in.readLong()); numSent = new AtomicLong(in.readLong()); @@ -78,7 +73,6 @@ public RollupShardStatus(StreamInput in) throws IOException { public RollupShardStatus( ShardId shardId, - Status status, long rollupStart, AtomicLong numReceived, AtomicLong numSent, @@ -86,7 +80,6 @@ public RollupShardStatus( AtomicLong numFailed ) { this.shardId = shardId; - this.status = status; this.rollupStart = rollupStart; this.numReceived = numReceived; this.numSent = numSent; @@ -94,31 +87,15 @@ public RollupShardStatus( this.numFailed = numFailed; } - public RollupShardStatus(ShardId shardId) { - status = Status.STARTED; + public RollupShardStatus(ShardId shardId, AtomicLong numReceived, AtomicLong numSent, AtomicLong numIndexed, AtomicLong numFailed) { this.shardId = shardId; this.rollupStart = System.currentTimeMillis(); - } - - public void init(AtomicLong numReceived, AtomicLong numSent, AtomicLong numIndexed, AtomicLong numFailed) { this.numReceived = numReceived; this.numSent = numSent; this.numIndexed = numIndexed; this.numFailed = numFailed; } - public void setFinished() { - this.status = Status.FINISHED; - } - - public boolean isCancelled() { - return this.status == Status.ABORT; - } - - public void setCancelled() { - this.status = Status.ABORT; - } - public static RollupShardStatus fromXContent(XContentParser parser) throws IOException { return PARSER.parse(parser, null); } @@ -127,7 +104,6 @@ public static RollupShardStatus fromXContent(XContentParser parser) throws IOExc public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { builder.startObject(); builder.field(SHARD_FIELD.getPreferredName(), shardId); - builder.field(STATUS_FIELD.getPreferredName(), status); builder.field(START_TIME_FIELD.getPreferredName(), Instant.ofEpochMilli(rollupStart).toString()); builder.field(IN_NUM_DOCS_RECEIVED_FIELD.getPreferredName(), numReceived.get()); builder.field(OUT_NUM_DOCS_SENT_FIELD.getPreferredName(), numSent.get()); @@ -145,7 +121,6 @@ public String getWriteableName() { @Override public void writeTo(StreamOutput out) throws IOException { shardId.writeTo(out); - out.writeEnum(status); out.writeLong(rollupStart); out.writeLong(numReceived.get()); out.writeLong(numSent.get()); @@ -165,7 +140,6 @@ public boolean equals(Object o) { return rollupStart == that.rollupStart && Objects.equals(shardId.getIndexName(), that.shardId.getIndexName()) && Objects.equals(shardId.id(), that.shardId.id()) - && status == that.status && Objects.equals(numReceived.get(), that.numReceived.get()) && Objects.equals(numSent.get(), that.numSent.get()) && Objects.equals(numIndexed.get(), that.numIndexed.get()) @@ -178,7 +152,6 @@ public int hashCode() { shardId.getIndexName(), shardId.id(), rollupStart, - status, numReceived.get(), numSent.get(), numIndexed.get(), @@ -190,22 +163,4 @@ public int hashCode() { public String toString() { return Strings.toString(this); } - - public enum Status { - STARTED, - FINISHED, - ABORT - } - - public void setNumSent(AtomicLong numSent) { - this.numSent = numSent; - } - - public void setNumIndexed(AtomicLong numIndexed) { - this.numIndexed = numIndexed; - } - - public void setNumFailed(AtomicLong numFailed) { - this.numFailed = numFailed; - } } diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/rollup/action/RollupShardTask.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/rollup/action/RollupShardTask.java index b099e71223a97..7ffa6d0a1d436 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/rollup/action/RollupShardTask.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/rollup/action/RollupShardTask.java @@ -14,11 +14,16 @@ import org.elasticsearch.xpack.core.rollup.RollupField; import java.util.Map; +import java.util.concurrent.atomic.AtomicLong; public class RollupShardTask extends CancellableTask { private String rollupIndex; private DownsampleConfig config; - private volatile RollupShardStatus status; + private final RollupShardStatus status; + private final AtomicLong numReceived = new AtomicLong(0); + private final AtomicLong numSent = new AtomicLong(0); + private final AtomicLong numIndexed = new AtomicLong(0); + private final AtomicLong numFailed = new AtomicLong(0); public RollupShardTask( long id, @@ -33,7 +38,7 @@ public RollupShardTask( super(id, type, action, RollupField.NAME + "_" + rollupIndex + "[" + shardId.id() + "]", parentTask, headers); this.rollupIndex = rollupIndex; this.config = config; - this.status = new RollupShardStatus(shardId); + this.status = new RollupShardStatus(shardId, numReceived, numSent, numIndexed, numFailed); } public String getRollupIndex() { @@ -49,8 +54,19 @@ public Status getStatus() { return status; } - @Override - public void onCancelled() { - status.setCancelled(); + public AtomicLong getNumReceived() { + return numReceived; + } + + public AtomicLong getNumSent() { + return numSent; + } + + public AtomicLong getNumIndexed() { + return numIndexed; + } + + public AtomicLong getNumFailed() { + return numFailed; } } diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/rollup/action/RollupShardStatusSerializingTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/rollup/action/RollupShardStatusSerializingTests.java index aa35ab324d15d..40f1cd000e722 100644 --- a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/rollup/action/RollupShardStatusSerializingTests.java +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/rollup/action/RollupShardStatusSerializingTests.java @@ -29,9 +29,7 @@ protected Reader instanceReader() { @Override protected RollupShardStatus createTestInstance() { RollupShardStatus rollupShardStatus = new RollupShardStatus( - new ShardId(randomAlphaOfLength(5), randomAlphaOfLength(5), randomInt(5)) - ); - rollupShardStatus.init( + new ShardId(randomAlphaOfLength(5), randomAlphaOfLength(5), randomInt(5)), new AtomicLong(randomNonNegativeLong()), new AtomicLong(randomNonNegativeLong()), new AtomicLong(randomNonNegativeLong()), diff --git a/x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/downsample/RollupShardIndexer.java b/x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/downsample/RollupShardIndexer.java index 9297d96d2456c..636ab5acfed50 100644 --- a/x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/downsample/RollupShardIndexer.java +++ b/x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/downsample/RollupShardIndexer.java @@ -49,7 +49,7 @@ import org.elasticsearch.xcontent.XContentType; import org.elasticsearch.xpack.core.downsample.DownsampleConfig; import org.elasticsearch.xpack.core.downsample.RollupIndexerAction; -import org.elasticsearch.xpack.core.rollup.action.RollupShardStatus; +import org.elasticsearch.xpack.core.rollup.action.RollupShardTask; import java.io.Closeable; import java.io.IOException; @@ -92,15 +92,16 @@ class RollupShardIndexer { private final String[] metricFields; private final String[] labelFields; private final Map fieldValueFetchers; - private final AtomicLong numReceived = new AtomicLong(); - private final AtomicLong numSent = new AtomicLong(); - private final AtomicLong numIndexed = new AtomicLong(); - private final AtomicLong numFailed = new AtomicLong(); + private final AtomicLong numReceived; + private final AtomicLong numSent; + private final AtomicLong numIndexed; + private final AtomicLong numFailed; - private final RollupShardStatus status; + private final RollupShardTask task; + private volatile boolean abort = false; RollupShardIndexer( - RollupShardStatus status, + RollupShardTask task, Client client, IndexService indexService, ShardId shardId, @@ -110,7 +111,11 @@ class RollupShardIndexer { String[] metricFields, String[] labelFields ) { - this.status = status; + this.task = task; + this.numReceived = task.getNumReceived(); + this.numSent = task.getNumSent(); + this.numIndexed = task.getNumIndexed(); + this.numFailed = task.getNumFailed(); this.client = client; this.indexShard = indexService.getShard(shardId.id()); this.config = config; @@ -118,8 +123,6 @@ class RollupShardIndexer { this.dimensionFields = dimensionFields; this.metricFields = metricFields; this.labelFields = labelFields; - - this.status.init(numReceived, numSent, numIndexed, numFailed); this.searcher = indexShard.acquireSearcher("downsampling"); Closeable toClose = searcher; try { @@ -174,12 +177,11 @@ public RollupIndexerAction.ShardRollupResponse execute() throws IOException { ); } - status.setFinished(); return new RollupIndexerAction.ShardRollupResponse(indexShard.shardId(), numIndexed.get()); } private void checkCancelled() { - if (status.isCancelled()) { + if (task.isCancelled() || abort) { logger.warn( "Shard [{}] rollup abort, sent [{}], indexed [{}], failed[{}]", indexShard.shardId(), @@ -218,7 +220,7 @@ public void afterBulk(long executionId, BulkRequest request, BulkResponse respon logger.error("Shard [{}] failed to populate rollup index. Failures: [{}]", indexShard.shardId(), failures); // cancel rollup task - status.setCancelled(); + abort = true; } } @@ -230,7 +232,7 @@ public void afterBulk(long executionId, BulkRequest request, Throwable failure) logger.error(() -> format("Shard [%s] failed to populate rollup index.", indexShard.shardId()), failure); // cancel rollup task - status.setCancelled(); + abort = true; } } }; diff --git a/x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/downsample/TransportRollupIndexerAction.java b/x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/downsample/TransportRollupIndexerAction.java index 598492a4f4861..0489d695d5d53 100644 --- a/x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/downsample/TransportRollupIndexerAction.java +++ b/x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/downsample/TransportRollupIndexerAction.java @@ -33,7 +33,7 @@ import org.elasticsearch.transport.TransportService; import org.elasticsearch.xpack.core.ClientHelper; import org.elasticsearch.xpack.core.downsample.RollupIndexerAction; -import org.elasticsearch.xpack.core.rollup.action.RollupShardStatus; +import org.elasticsearch.xpack.core.rollup.action.RollupShardTask; import java.io.IOException; import java.util.Arrays; @@ -133,7 +133,7 @@ protected RollupIndexerAction.ShardRollupResponse shardOperation(RollupIndexerAc throws IOException { IndexService indexService = indicesService.indexService(request.shardId().getIndex()); RollupShardIndexer indexer = new RollupShardIndexer( - (RollupShardStatus) task.getStatus(), + (RollupShardTask) task, client, indexService, request.shardId(), @@ -201,6 +201,7 @@ protected void finishHim() { @Override protected void onOperation(@Nullable ShardRouting shard, final ShardIterator shardIt, int shardIndex, Exception e) { + // when this shard operation failed, cancel other shard operations cancelOtherShardIndexers(); super.onOperation(shard, shardIt, shardIndex, e); } diff --git a/x-pack/plugin/rollup/src/test/java/org/elasticsearch/xpack/downsample/DownsampleActionSingleNodeTests.java b/x-pack/plugin/rollup/src/test/java/org/elasticsearch/xpack/downsample/DownsampleActionSingleNodeTests.java index d973f2236a637..f4d06397da0fd 100644 --- a/x-pack/plugin/rollup/src/test/java/org/elasticsearch/xpack/downsample/DownsampleActionSingleNodeTests.java +++ b/x-pack/plugin/rollup/src/test/java/org/elasticsearch/xpack/downsample/DownsampleActionSingleNodeTests.java @@ -67,7 +67,9 @@ import org.elasticsearch.search.aggregations.metrics.ValueCountAggregationBuilder; import org.elasticsearch.search.sort.SortBuilders; import org.elasticsearch.search.sort.SortOrder; +import org.elasticsearch.tasks.TaskCancelHelper; import org.elasticsearch.tasks.TaskCancelledException; +import org.elasticsearch.tasks.TaskId; import org.elasticsearch.test.ESSingleNodeTestCase; import org.elasticsearch.xcontent.XContentBuilder; import org.elasticsearch.xcontent.XContentFactory; @@ -78,7 +80,7 @@ import org.elasticsearch.xpack.core.ilm.LifecycleSettings; import org.elasticsearch.xpack.core.ilm.RolloverAction; import org.elasticsearch.xpack.core.rollup.ConfigTestHelpers; -import org.elasticsearch.xpack.core.rollup.action.RollupShardStatus; +import org.elasticsearch.xpack.core.rollup.action.RollupShardTask; import org.elasticsearch.xpack.ilm.IndexLifecycle; import org.elasticsearch.xpack.rollup.Rollup; import org.junit.Before; @@ -100,6 +102,7 @@ import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; +import static java.util.Collections.emptyMap; import static org.elasticsearch.index.mapper.TimeSeriesParams.TIME_SERIES_METRIC_PARAM; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertHitCount; @@ -593,11 +596,21 @@ public void testCancelRollupIndexer() throws IOException { IndexService indexService = indexServices.indexServiceSafe(srcIndex); int shardNum = randomIntBetween(0, numOfShards - 1); IndexShard shard = indexService.getShard(shardNum); - RollupShardStatus status = new RollupShardStatus(shard.shardId()); + RollupShardTask task = new RollupShardTask( + randomLong(), + "rollup", + "action", + TaskId.EMPTY_TASK_ID, + rollupIndex, + config, + emptyMap(), + shard.shardId() + ); + TaskCancelHelper.cancel(task, "test cancel"); // re-use source index as temp index for test RollupShardIndexer indexer = new RollupShardIndexer( - status, + task, client(), indexService, shard.shardId(), @@ -607,7 +620,6 @@ public void testCancelRollupIndexer() throws IOException { new String[] { FIELD_NUMERIC_1, FIELD_NUMERIC_2 }, new String[] {} ); - status.setCancelled(); TaskCancelledException exception = expectThrows(TaskCancelledException.class, () -> indexer.execute()); assertThat(exception.getMessage(), equalTo("Shard [" + sourceIndex + "][" + shardNum + "] rollup cancelled")); From 1fae1d6c5d73120399e7bf6b01a2e08cd5048c14 Mon Sep 17 00:00:00 2001 From: weizijun Date: Tue, 15 Nov 2022 15:48:03 +0800 Subject: [PATCH 09/14] improve --- .../xpack/core/rollup/action/RollupShardStatus.java | 9 --------- 1 file changed, 9 deletions(-) diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/rollup/action/RollupShardStatus.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/rollup/action/RollupShardStatus.java index b282cbef3947d..69067485770a0 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/rollup/action/RollupShardStatus.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/rollup/action/RollupShardStatus.java @@ -87,15 +87,6 @@ public RollupShardStatus( this.numFailed = numFailed; } - public RollupShardStatus(ShardId shardId, AtomicLong numReceived, AtomicLong numSent, AtomicLong numIndexed, AtomicLong numFailed) { - this.shardId = shardId; - this.rollupStart = System.currentTimeMillis(); - this.numReceived = numReceived; - this.numSent = numSent; - this.numIndexed = numIndexed; - this.numFailed = numFailed; - } - public static RollupShardStatus fromXContent(XContentParser parser) throws IOException { return PARSER.parse(parser, null); } From c8e15847fc9c2a5b0f095db25098ee4791a56313 Mon Sep 17 00:00:00 2001 From: weizijun Date: Wed, 16 Nov 2022 16:45:10 +0800 Subject: [PATCH 10/14] improve --- .../xpack/core/rollup/action/RollupShardTask.java | 12 +++++++----- .../action/RollupShardStatusSerializingTests.java | 1 + .../downsample/TransportRollupIndexerAction.java | 15 +++++++++++++-- 3 files changed, 21 insertions(+), 7 deletions(-) diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/rollup/action/RollupShardTask.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/rollup/action/RollupShardTask.java index 7ffa6d0a1d436..f2a7c117be08d 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/rollup/action/RollupShardTask.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/rollup/action/RollupShardTask.java @@ -17,9 +17,10 @@ import java.util.concurrent.atomic.AtomicLong; public class RollupShardTask extends CancellableTask { - private String rollupIndex; - private DownsampleConfig config; - private final RollupShardStatus status; + private final String rollupIndex; + private final DownsampleConfig config; + private final ShardId shardId; + private final long rollupStartTime; private final AtomicLong numReceived = new AtomicLong(0); private final AtomicLong numSent = new AtomicLong(0); private final AtomicLong numIndexed = new AtomicLong(0); @@ -38,7 +39,8 @@ public RollupShardTask( super(id, type, action, RollupField.NAME + "_" + rollupIndex + "[" + shardId.id() + "]", parentTask, headers); this.rollupIndex = rollupIndex; this.config = config; - this.status = new RollupShardStatus(shardId, numReceived, numSent, numIndexed, numFailed); + this.shardId = shardId; + this.rollupStartTime = System.currentTimeMillis(); } public String getRollupIndex() { @@ -51,7 +53,7 @@ public DownsampleConfig config() { @Override public Status getStatus() { - return status; + return new RollupShardStatus(shardId, rollupStartTime, numReceived, numSent, numIndexed, numFailed); } public AtomicLong getNumReceived() { diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/rollup/action/RollupShardStatusSerializingTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/rollup/action/RollupShardStatusSerializingTests.java index 40f1cd000e722..c8d4bbf6bc681 100644 --- a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/rollup/action/RollupShardStatusSerializingTests.java +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/rollup/action/RollupShardStatusSerializingTests.java @@ -30,6 +30,7 @@ protected Reader instanceReader() { protected RollupShardStatus createTestInstance() { RollupShardStatus rollupShardStatus = new RollupShardStatus( new ShardId(randomAlphaOfLength(5), randomAlphaOfLength(5), randomInt(5)), + randomMillisUpToYear9999(), new AtomicLong(randomNonNegativeLong()), new AtomicLong(randomNonNegativeLong()), new AtomicLong(randomNonNegativeLong()), diff --git a/x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/downsample/TransportRollupIndexerAction.java b/x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/downsample/TransportRollupIndexerAction.java index 0489d695d5d53..d8388857b6e31 100644 --- a/x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/downsample/TransportRollupIndexerAction.java +++ b/x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/downsample/TransportRollupIndexerAction.java @@ -20,6 +20,7 @@ import org.elasticsearch.cluster.block.ClusterBlockLevel; import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; import org.elasticsearch.cluster.routing.GroupShardsIterator; +import org.elasticsearch.cluster.routing.PlainShardIterator; import org.elasticsearch.cluster.routing.ShardIterator; import org.elasticsearch.cluster.routing.ShardRouting; import org.elasticsearch.cluster.service.ClusterService; @@ -37,6 +38,7 @@ import java.io.IOException; import java.util.Arrays; +import java.util.Collections; import java.util.concurrent.atomic.AtomicReferenceArray; import static org.elasticsearch.xpack.rollup.Rollup.TASK_THREAD_POOL_NAME; @@ -180,7 +182,7 @@ private class Async extends AsyncBroadcastAction { private final RollupIndexerAction.Request request; private final ActionListener listener; private final Task task; - private boolean hasCancelled = false; + private volatile boolean hasCancelled = false; protected Async(Task task, RollupIndexerAction.Request request, ActionListener listener) { super(task, request, listener); @@ -203,7 +205,8 @@ protected void finishHim() { protected void onOperation(@Nullable ShardRouting shard, final ShardIterator shardIt, int shardIndex, Exception e) { // when this shard operation failed, cancel other shard operations cancelOtherShardIndexers(); - super.onOperation(shard, shardIt, shardIndex, e); + // to avoid retry, set the shardIt to the last shard + super.onOperation(shard, new PlainShardIterator(shard.shardId(), Collections.emptyList()), shardIndex, e); } private void cancelOtherShardIndexers() { @@ -212,6 +215,14 @@ private void cancelOtherShardIndexers() { task, new CancelTasksRequest().setTargetParentTaskId(new TaskId(clusterService.localNode().getId(), task.getId())), ActionListener.wrap(r -> { + if (r.getNodeFailures().size() > 0 || r.getTaskFailures().size() > 0) { + logger.info( + "[{}] rollup cancel other shard indexers response failed, response is {}", + request.getRollupRequest().getSourceIndex(), + r + ); + return; + } logger.info("[{}] rollup cancel other shard indexers", request.getRollupRequest().getSourceIndex()); hasCancelled = true; }, From f5f544f63d4a9ffceef693c03974dc1012720379 Mon Sep 17 00:00:00 2001 From: weizijun Date: Thu, 17 Nov 2022 17:43:43 +0800 Subject: [PATCH 11/14] improve --- .../TransportRollupIndexerAction.java | 53 +++++++++---------- 1 file changed, 25 insertions(+), 28 deletions(-) diff --git a/x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/downsample/TransportRollupIndexerAction.java b/x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/downsample/TransportRollupIndexerAction.java index d8388857b6e31..62d1bc3847c1f 100644 --- a/x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/downsample/TransportRollupIndexerAction.java +++ b/x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/downsample/TransportRollupIndexerAction.java @@ -10,7 +10,6 @@ import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.NoShardAvailableActionException; import org.elasticsearch.action.admin.cluster.node.tasks.cancel.CancelTasksRequest; -import org.elasticsearch.action.admin.cluster.node.tasks.cancel.TransportCancelTasksAction; import org.elasticsearch.action.support.ActionFilters; import org.elasticsearch.action.support.broadcast.TransportBroadcastAction; import org.elasticsearch.client.internal.Client; @@ -57,7 +56,6 @@ public class TransportRollupIndexerAction extends TransportBroadcastAction< private final Client client; private final ClusterService clusterService; private final IndicesService indicesService; - private final TransportCancelTasksAction cancelTasksAction; @Inject public TransportRollupIndexerAction( @@ -66,8 +64,7 @@ public TransportRollupIndexerAction( TransportService transportService, IndicesService indicesService, ActionFilters actionFilters, - IndexNameExpressionResolver indexNameExpressionResolver, - TransportCancelTasksAction cancelTasksAction + IndexNameExpressionResolver indexNameExpressionResolver ) { super( RollupIndexerAction.NAME, @@ -82,7 +79,6 @@ public TransportRollupIndexerAction( this.client = new OriginSettingClient(client, ClientHelper.ROLLUP_ORIGIN); this.clusterService = clusterService; this.indicesService = indicesService; - this.cancelTasksAction = cancelTasksAction; } @Override @@ -211,29 +207,30 @@ protected void onOperation(@Nullable ShardRouting shard, final ShardIterator sha private void cancelOtherShardIndexers() { if (false == hasCancelled) { - cancelTasksAction.execute( - task, - new CancelTasksRequest().setTargetParentTaskId(new TaskId(clusterService.localNode().getId(), task.getId())), - ActionListener.wrap(r -> { - if (r.getNodeFailures().size() > 0 || r.getTaskFailures().size() > 0) { - logger.info( - "[{}] rollup cancel other shard indexers response failed, response is {}", - request.getRollupRequest().getSourceIndex(), - r - ); - return; - } - logger.info("[{}] rollup cancel other shard indexers", request.getRollupRequest().getSourceIndex()); - hasCancelled = true; - }, - e -> { - logger.warn( - () -> "[" + request.getRollupRequest().getSourceIndex() + "] rollup cancel other shard indexers failed", - e - ); - } - ) - ); + client.admin() + .cluster() + .cancelTasks( + new CancelTasksRequest().setTargetParentTaskId(new TaskId(clusterService.localNode().getId(), task.getId())), + ActionListener.wrap(r -> { + if (r.getNodeFailures().size() > 0 || r.getTaskFailures().size() > 0) { + logger.info( + "[{}] rollup cancel other shard indexers response failed, response is {}", + request.getRollupRequest().getSourceIndex(), + r + ); + return; + } + logger.info("[{}] rollup cancel other shard indexers", request.getRollupRequest().getSourceIndex()); + hasCancelled = true; + }, + e -> { + logger.warn( + () -> "[" + request.getRollupRequest().getSourceIndex() + "] rollup cancel other shard indexers failed", + e + ); + } + ) + ); } } } From 18f2432bb009ccd0b2423dee2df5014cdae0e5e7 Mon Sep 17 00:00:00 2001 From: weizijun Date: Fri, 18 Nov 2022 11:15:42 +0800 Subject: [PATCH 12/14] improve stats --- .../core/rollup/action/RollupShardTask.java | 32 +++++++++++---- .../xpack/downsample/RollupShardIndexer.java | 41 ++++++++----------- 2 files changed, 40 insertions(+), 33 deletions(-) diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/rollup/action/RollupShardTask.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/rollup/action/RollupShardTask.java index f2a7c117be08d..96e1707c97bec 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/rollup/action/RollupShardTask.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/rollup/action/RollupShardTask.java @@ -56,19 +56,35 @@ public Status getStatus() { return new RollupShardStatus(shardId, rollupStartTime, numReceived, numSent, numIndexed, numFailed); } - public AtomicLong getNumReceived() { - return numReceived; + public long getNumReceived() { + return numReceived.get(); } - public AtomicLong getNumSent() { - return numSent; + public long getNumSent() { + return numSent.get(); } - public AtomicLong getNumIndexed() { - return numIndexed; + public long getNumIndexed() { + return numIndexed.get(); } - public AtomicLong getNumFailed() { - return numFailed; + public long getNumFailed() { + return numFailed.get(); + } + + public void addNumReceived(long count) { + numReceived.addAndGet(count); + } + + public void addNumSent(long count) { + numSent.addAndGet(count); + } + + public void addNumIndexed(long count) { + numIndexed.addAndGet(count); + } + + public void addNumFailed(long count) { + numFailed.addAndGet(count); } } diff --git a/x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/downsample/RollupShardIndexer.java b/x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/downsample/RollupShardIndexer.java index 2a06ca34ed1b3..e541775a38ba8 100644 --- a/x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/downsample/RollupShardIndexer.java +++ b/x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/downsample/RollupShardIndexer.java @@ -59,7 +59,6 @@ import java.util.List; import java.util.Map; import java.util.Objects; -import java.util.concurrent.atomic.AtomicLong; import java.util.function.Function; import java.util.stream.Collectors; @@ -92,10 +91,6 @@ class RollupShardIndexer { private final String[] metricFields; private final String[] labelFields; private final Map fieldValueFetchers; - private final AtomicLong numReceived; - private final AtomicLong numSent; - private final AtomicLong numIndexed; - private final AtomicLong numFailed; private final RollupShardTask task; private volatile boolean abort = false; @@ -112,10 +107,6 @@ class RollupShardIndexer { String[] labelFields ) { this.task = task; - this.numReceived = task.getNumReceived(); - this.numSent = task.getNumSent(); - this.numIndexed = task.getNumIndexed(); - this.numFailed = task.getNumFailed(); this.client = client; this.indexShard = indexService.getShard(shardId.id()); this.config = config; @@ -158,26 +149,26 @@ public RollupIndexerAction.ShardRollupResponse execute() throws IOException { logger.info( "Shard [{}] successfully sent [{}], received source doc [{}], indexed rollup doc [{}], failed [{}], took [{}]", indexShard.shardId(), - numReceived.get(), - numSent.get(), - numIndexed.get(), - numFailed.get(), + task.getNumReceived(), + task.getNumSent(), + task.getNumIndexed(), + task.getNumFailed(), TimeValue.timeValueMillis(System.currentTimeMillis() - startTime) ); - if (numIndexed.get() != numSent.get()) { + if (task.getNumIndexed() != task.getNumSent()) { throw new ElasticsearchException( "Shard [" + indexShard.shardId() + "] failed to index all rollup documents. Sent [" - + numSent.get() + + task.getNumSent() + "], indexed [" - + numIndexed.get() + + task.getNumIndexed() + "]." ); } - return new RollupIndexerAction.ShardRollupResponse(indexShard.shardId(), numIndexed.get()); + return new RollupIndexerAction.ShardRollupResponse(indexShard.shardId(),task.getNumIndexed()); } private void checkCancelled() { @@ -185,9 +176,9 @@ private void checkCancelled() { logger.warn( "Shard [{}] rollup abort, sent [{}], indexed [{}], failed[{}]", indexShard.shardId(), - numSent.get(), - numIndexed.get(), - numFailed.get() + task.getNumSent(), + task.getNumIndexed(), + task.getNumFailed() ); throw new TaskCancelledException(format("Shard %s rollup cancelled", indexShard.shardId())); } @@ -197,17 +188,17 @@ private BulkProcessor createBulkProcessor() { final BulkProcessor.Listener listener = new BulkProcessor.Listener() { @Override public void beforeBulk(long executionId, BulkRequest request) { - numSent.addAndGet(request.numberOfActions()); + task.addNumSent(request.numberOfActions()); } @Override public void afterBulk(long executionId, BulkRequest request, BulkResponse response) { - numIndexed.addAndGet(request.numberOfActions()); + task.addNumIndexed(request.numberOfActions()); if (response.hasFailures()) { List failedItems = Arrays.stream(response.getItems()) .filter(BulkItemResponse::isFailed) .collect(Collectors.toList()); - numFailed.addAndGet(failedItems.size()); + task.addNumFailed(failedItems.size()); Map failures = failedItems.stream() .collect( @@ -228,7 +219,7 @@ public void afterBulk(long executionId, BulkRequest request, BulkResponse respon public void afterBulk(long executionId, BulkRequest request, Throwable failure) { if (failure != null) { long items = request.numberOfActions(); - numFailed.addAndGet(items); + task.addNumFailed(items); logger.error(() -> format("Shard [%s] failed to populate rollup index.", indexShard.shardId()), failure); // cancel rollup task @@ -268,7 +259,7 @@ public LeafBucketCollector getLeafCollector(final AggregationExecutionContext ag return new LeafBucketCollector() { @Override public void collect(int docId, long owningBucketOrd) throws IOException { - numReceived.incrementAndGet(); + task.addNumReceived(1); final BytesRef tsid = aggCtx.getTsid(); assert tsid != null : "Document without [" + TimeSeriesIdFieldMapper.NAME + "] field was found."; final int tsidOrd = aggCtx.getTsidOrd(); From ad7d1c30f518dbd5e95744ba0931051625970780 Mon Sep 17 00:00:00 2001 From: weizijun Date: Fri, 18 Nov 2022 11:25:28 +0800 Subject: [PATCH 13/14] change name --- .../elasticsearch/xpack/downsample/RollupShardIndexer.java | 2 +- .../xpack/downsample/TransportRollupIndexerAction.java | 6 +++--- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/downsample/RollupShardIndexer.java b/x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/downsample/RollupShardIndexer.java index e541775a38ba8..c42320132511a 100644 --- a/x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/downsample/RollupShardIndexer.java +++ b/x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/downsample/RollupShardIndexer.java @@ -168,7 +168,7 @@ public RollupIndexerAction.ShardRollupResponse execute() throws IOException { ); } - return new RollupIndexerAction.ShardRollupResponse(indexShard.shardId(),task.getNumIndexed()); + return new RollupIndexerAction.ShardRollupResponse(indexShard.shardId(), task.getNumIndexed()); } private void checkCancelled() { diff --git a/x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/downsample/TransportRollupIndexerAction.java b/x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/downsample/TransportRollupIndexerAction.java index 62d1bc3847c1f..158b1ad527966 100644 --- a/x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/downsample/TransportRollupIndexerAction.java +++ b/x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/downsample/TransportRollupIndexerAction.java @@ -178,7 +178,7 @@ private class Async extends AsyncBroadcastAction { private final RollupIndexerAction.Request request; private final ActionListener listener; private final Task task; - private volatile boolean hasCancelled = false; + private volatile boolean hasCancelledOtherShardOperations = false; protected Async(Task task, RollupIndexerAction.Request request, ActionListener listener) { super(task, request, listener); @@ -206,7 +206,7 @@ protected void onOperation(@Nullable ShardRouting shard, final ShardIterator sha } private void cancelOtherShardIndexers() { - if (false == hasCancelled) { + if (false == hasCancelledOtherShardOperations) { client.admin() .cluster() .cancelTasks( @@ -221,7 +221,7 @@ private void cancelOtherShardIndexers() { return; } logger.info("[{}] rollup cancel other shard indexers", request.getRollupRequest().getSourceIndex()); - hasCancelled = true; + hasCancelledOtherShardOperations = true; }, e -> { logger.warn( From e745cb70ed531d7591ec05dd9bf7f067a0905c71 Mon Sep 17 00:00:00 2001 From: weizijun Date: Fri, 2 Dec 2022 15:47:37 +0800 Subject: [PATCH 14/14] improve --- .../broadcast/TransportBroadcastAction.java | 2 +- .../core/rollup/action/RollupShardStatus.java | 70 +++++++------------ .../core/rollup/action/RollupShardTask.java | 2 +- .../RollupShardStatusSerializingTests.java | 9 ++- .../TransportRollupIndexerAction.java | 45 ------------ 5 files changed, 33 insertions(+), 95 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/action/support/broadcast/TransportBroadcastAction.java b/server/src/main/java/org/elasticsearch/action/support/broadcast/TransportBroadcastAction.java index 3e97f0ebaacb4..7e0b636d0056b 100644 --- a/server/src/main/java/org/elasticsearch/action/support/broadcast/TransportBroadcastAction.java +++ b/server/src/main/java/org/elasticsearch/action/support/broadcast/TransportBroadcastAction.java @@ -200,7 +200,7 @@ protected void onOperation(ShardRouting shard, int shardIndex, ShardResponse res } } - protected void onOperation(@Nullable ShardRouting shard, final ShardIterator shardIt, int shardIndex, Exception e) { + void onOperation(@Nullable ShardRouting shard, final ShardIterator shardIt, int shardIndex, Exception e) { // we set the shard failure always, even if its the first in the replication group, and the next one // will work (it will just override it...) setFailure(shardIt, shardIndex, e); diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/rollup/action/RollupShardStatus.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/rollup/action/RollupShardStatus.java index 69067485770a0..2bbd2d96b9cea 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/rollup/action/RollupShardStatus.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/rollup/action/RollupShardStatus.java @@ -20,7 +20,6 @@ import java.io.IOException; import java.time.Instant; import java.util.Objects; -import java.util.concurrent.atomic.AtomicLong; import static org.elasticsearch.xcontent.ConstructingObjectParser.constructorArg; @@ -35,10 +34,10 @@ public class RollupShardStatus implements Task.Status { private final ShardId shardId; private final long rollupStart; - private final AtomicLong numReceived; - private final AtomicLong numSent; - private final AtomicLong numIndexed; - private final AtomicLong numFailed; + private final long numReceived; + private final long numSent; + private final long numIndexed; + private final long numFailed; private static final ConstructingObjectParser PARSER; static { @@ -46,11 +45,11 @@ public class RollupShardStatus implements Task.Status { NAME, args -> new RollupShardStatus( ShardId.fromString((String) args[0]), - Instant.parse((String) args[2]).toEpochMilli(), - new AtomicLong((Long) args[3]), - new AtomicLong((Long) args[4]), - new AtomicLong((Long) args[5]), - new AtomicLong((Long) args[6]) + Instant.parse((String) args[1]).toEpochMilli(), + (Long) args[2], + (Long) args[3], + (Long) args[4], + (Long) args[5] ) ); @@ -65,20 +64,13 @@ public class RollupShardStatus implements Task.Status { public RollupShardStatus(StreamInput in) throws IOException { shardId = new ShardId(in); rollupStart = in.readLong(); - numReceived = new AtomicLong(in.readLong()); - numSent = new AtomicLong(in.readLong()); - numIndexed = new AtomicLong(in.readLong()); - numFailed = new AtomicLong(in.readLong()); + numReceived = in.readLong(); + numSent = in.readLong(); + numIndexed = in.readLong(); + numFailed = in.readLong(); } - public RollupShardStatus( - ShardId shardId, - long rollupStart, - AtomicLong numReceived, - AtomicLong numSent, - AtomicLong numIndexed, - AtomicLong numFailed - ) { + public RollupShardStatus(ShardId shardId, long rollupStart, long numReceived, long numSent, long numIndexed, long numFailed) { this.shardId = shardId; this.rollupStart = rollupStart; this.numReceived = numReceived; @@ -96,10 +88,10 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws builder.startObject(); builder.field(SHARD_FIELD.getPreferredName(), shardId); builder.field(START_TIME_FIELD.getPreferredName(), Instant.ofEpochMilli(rollupStart).toString()); - builder.field(IN_NUM_DOCS_RECEIVED_FIELD.getPreferredName(), numReceived.get()); - builder.field(OUT_NUM_DOCS_SENT_FIELD.getPreferredName(), numSent.get()); - builder.field(OUT_NUM_DOCS_INDEXED_FIELD.getPreferredName(), numIndexed.get()); - builder.field(OUT_NUM_DOCS_FAILED_FIELD.getPreferredName(), numFailed.get()); + builder.field(IN_NUM_DOCS_RECEIVED_FIELD.getPreferredName(), numReceived); + builder.field(OUT_NUM_DOCS_SENT_FIELD.getPreferredName(), numSent); + builder.field(OUT_NUM_DOCS_INDEXED_FIELD.getPreferredName(), numIndexed); + builder.field(OUT_NUM_DOCS_FAILED_FIELD.getPreferredName(), numFailed); builder.endObject(); return builder; } @@ -113,10 +105,10 @@ public String getWriteableName() { public void writeTo(StreamOutput out) throws IOException { shardId.writeTo(out); out.writeLong(rollupStart); - out.writeLong(numReceived.get()); - out.writeLong(numSent.get()); - out.writeLong(numIndexed.get()); - out.writeLong(numFailed.get()); + out.writeLong(numReceived); + out.writeLong(numSent); + out.writeLong(numIndexed); + out.writeLong(numFailed); } @Override @@ -131,23 +123,15 @@ public boolean equals(Object o) { return rollupStart == that.rollupStart && Objects.equals(shardId.getIndexName(), that.shardId.getIndexName()) && Objects.equals(shardId.id(), that.shardId.id()) - && Objects.equals(numReceived.get(), that.numReceived.get()) - && Objects.equals(numSent.get(), that.numSent.get()) - && Objects.equals(numIndexed.get(), that.numIndexed.get()) - && Objects.equals(numFailed.get(), that.numFailed.get()); + && Objects.equals(numReceived, that.numReceived) + && Objects.equals(numSent, that.numSent) + && Objects.equals(numIndexed, that.numIndexed) + && Objects.equals(numFailed, that.numFailed); } @Override public int hashCode() { - return Objects.hash( - shardId.getIndexName(), - shardId.id(), - rollupStart, - numReceived.get(), - numSent.get(), - numIndexed.get(), - numFailed.get() - ); + return Objects.hash(shardId.getIndexName(), shardId.id(), rollupStart, numReceived, numSent, numIndexed, numFailed); } @Override diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/rollup/action/RollupShardTask.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/rollup/action/RollupShardTask.java index 96e1707c97bec..ccf80afd0ae41 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/rollup/action/RollupShardTask.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/rollup/action/RollupShardTask.java @@ -53,7 +53,7 @@ public DownsampleConfig config() { @Override public Status getStatus() { - return new RollupShardStatus(shardId, rollupStartTime, numReceived, numSent, numIndexed, numFailed); + return new RollupShardStatus(shardId, rollupStartTime, numReceived.get(), numSent.get(), numIndexed.get(), numFailed.get()); } public long getNumReceived() { diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/rollup/action/RollupShardStatusSerializingTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/rollup/action/RollupShardStatusSerializingTests.java index c8d4bbf6bc681..b1a8357a57c52 100644 --- a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/rollup/action/RollupShardStatusSerializingTests.java +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/rollup/action/RollupShardStatusSerializingTests.java @@ -13,7 +13,6 @@ import org.elasticsearch.xcontent.XContentParser; import java.io.IOException; -import java.util.concurrent.atomic.AtomicLong; public class RollupShardStatusSerializingTests extends AbstractXContentSerializingTestCase { @Override @@ -31,10 +30,10 @@ protected RollupShardStatus createTestInstance() { RollupShardStatus rollupShardStatus = new RollupShardStatus( new ShardId(randomAlphaOfLength(5), randomAlphaOfLength(5), randomInt(5)), randomMillisUpToYear9999(), - new AtomicLong(randomNonNegativeLong()), - new AtomicLong(randomNonNegativeLong()), - new AtomicLong(randomNonNegativeLong()), - new AtomicLong(randomNonNegativeLong()) + randomNonNegativeLong(), + randomNonNegativeLong(), + randomNonNegativeLong(), + randomNonNegativeLong() ); return rollupShardStatus; } diff --git a/x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/downsample/TransportRollupIndexerAction.java b/x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/downsample/TransportRollupIndexerAction.java index 158b1ad527966..be61f99916821 100644 --- a/x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/downsample/TransportRollupIndexerAction.java +++ b/x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/downsample/TransportRollupIndexerAction.java @@ -9,7 +9,6 @@ import org.elasticsearch.ElasticsearchException; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.NoShardAvailableActionException; -import org.elasticsearch.action.admin.cluster.node.tasks.cancel.CancelTasksRequest; import org.elasticsearch.action.support.ActionFilters; import org.elasticsearch.action.support.broadcast.TransportBroadcastAction; import org.elasticsearch.client.internal.Client; @@ -19,17 +18,14 @@ import org.elasticsearch.cluster.block.ClusterBlockLevel; import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; import org.elasticsearch.cluster.routing.GroupShardsIterator; -import org.elasticsearch.cluster.routing.PlainShardIterator; import org.elasticsearch.cluster.routing.ShardIterator; import org.elasticsearch.cluster.routing.ShardRouting; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.io.stream.StreamInput; -import org.elasticsearch.core.Nullable; import org.elasticsearch.index.IndexService; import org.elasticsearch.indices.IndicesService; import org.elasticsearch.tasks.Task; -import org.elasticsearch.tasks.TaskId; import org.elasticsearch.transport.TransportService; import org.elasticsearch.xpack.core.ClientHelper; import org.elasticsearch.xpack.core.downsample.RollupIndexerAction; @@ -37,7 +33,6 @@ import java.io.IOException; import java.util.Arrays; -import java.util.Collections; import java.util.concurrent.atomic.AtomicReferenceArray; import static org.elasticsearch.xpack.rollup.Rollup.TASK_THREAD_POOL_NAME; @@ -177,12 +172,9 @@ protected RollupIndexerAction.Response newResponse( private class Async extends AsyncBroadcastAction { private final RollupIndexerAction.Request request; private final ActionListener listener; - private final Task task; - private volatile boolean hasCancelledOtherShardOperations = false; protected Async(Task task, RollupIndexerAction.Request request, ActionListener listener) { super(task, request, listener); - this.task = task; this.request = request; this.listener = listener; } @@ -196,42 +188,5 @@ protected void finishHim() { listener.onFailure(e); } } - - @Override - protected void onOperation(@Nullable ShardRouting shard, final ShardIterator shardIt, int shardIndex, Exception e) { - // when this shard operation failed, cancel other shard operations - cancelOtherShardIndexers(); - // to avoid retry, set the shardIt to the last shard - super.onOperation(shard, new PlainShardIterator(shard.shardId(), Collections.emptyList()), shardIndex, e); - } - - private void cancelOtherShardIndexers() { - if (false == hasCancelledOtherShardOperations) { - client.admin() - .cluster() - .cancelTasks( - new CancelTasksRequest().setTargetParentTaskId(new TaskId(clusterService.localNode().getId(), task.getId())), - ActionListener.wrap(r -> { - if (r.getNodeFailures().size() > 0 || r.getTaskFailures().size() > 0) { - logger.info( - "[{}] rollup cancel other shard indexers response failed, response is {}", - request.getRollupRequest().getSourceIndex(), - r - ); - return; - } - logger.info("[{}] rollup cancel other shard indexers", request.getRollupRequest().getSourceIndex()); - hasCancelledOtherShardOperations = true; - }, - e -> { - logger.warn( - () -> "[" + request.getRollupRequest().getSourceIndex() + "] rollup cancel other shard indexers failed", - e - ); - } - ) - ); - } - } } }