diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/XPackClientPlugin.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/XPackClientPlugin.java index de24759941393..a793c20065511 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/XPackClientPlugin.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/XPackClientPlugin.java @@ -151,6 +151,7 @@ import org.elasticsearch.xpack.core.rollup.action.GetRollupJobsAction; import org.elasticsearch.xpack.core.rollup.action.PutRollupJobAction; import org.elasticsearch.xpack.core.rollup.action.RollupSearchAction; +import org.elasticsearch.xpack.core.rollup.action.RollupShardStatus; import org.elasticsearch.xpack.core.rollup.action.StartRollupJobAction; import org.elasticsearch.xpack.core.rollup.action.StopRollupJobAction; import org.elasticsearch.xpack.core.rollup.job.RollupJob; @@ -460,6 +461,7 @@ public List getNamedWriteables() { new NamedWriteableRegistry.Entry(PersistentTaskParams.class, RollupJob.NAME, RollupJob::new), new NamedWriteableRegistry.Entry(Task.Status.class, RollupJobStatus.NAME, RollupJobStatus::new), new NamedWriteableRegistry.Entry(PersistentTaskState.class, RollupJobStatus.NAME, RollupJobStatus::new), + new NamedWriteableRegistry.Entry(Task.Status.class, RollupShardStatus.NAME, RollupShardStatus::new), // ccr new NamedWriteableRegistry.Entry(AutoFollowMetadata.class, AutoFollowMetadata.TYPE, AutoFollowMetadata::new), new NamedWriteableRegistry.Entry(Metadata.Custom.class, AutoFollowMetadata.TYPE, AutoFollowMetadata::new), diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/downsample/RollupIndexerAction.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/downsample/RollupIndexerAction.java index 135578c52f1dc..166d681c5230c 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/downsample/RollupIndexerAction.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/downsample/RollupIndexerAction.java @@ -24,6 +24,7 @@ import org.elasticsearch.tasks.TaskId; import org.elasticsearch.xcontent.ToXContentObject; import org.elasticsearch.xcontent.XContentBuilder; +import org.elasticsearch.xpack.core.rollup.action.RollupShardTask; import java.io.IOException; import java.util.Arrays; @@ -258,6 +259,20 @@ public void writeTo(StreamOutput out) throws IOException { super.writeTo(out); request.writeTo(out); } + + @Override + public Task createTask(long id, String type, String action, TaskId parentTaskId, Map headers) { + return new RollupShardTask( + id, + type, + action, + parentTaskId, + request.rollupRequest.getSourceIndex(), + request.rollupRequest.getDownsampleConfig(), + headers, + shardId() + ); + } } public static class ShardRollupResponse extends BroadcastShardResponse { diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/rollup/action/RollupShardStatus.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/rollup/action/RollupShardStatus.java new file mode 100644 index 0000000000000..2bbd2d96b9cea --- /dev/null +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/rollup/action/RollupShardStatus.java @@ -0,0 +1,141 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.xpack.core.rollup.action; + +import org.elasticsearch.common.Strings; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.index.shard.ShardId; +import org.elasticsearch.tasks.Task; +import org.elasticsearch.xcontent.ConstructingObjectParser; +import org.elasticsearch.xcontent.ParseField; +import org.elasticsearch.xcontent.XContentBuilder; +import org.elasticsearch.xcontent.XContentParser; + +import java.io.IOException; +import java.time.Instant; +import java.util.Objects; + +import static org.elasticsearch.xcontent.ConstructingObjectParser.constructorArg; + +public class RollupShardStatus implements Task.Status { + public static final String NAME = "rollup-index-shard"; + private static final ParseField SHARD_FIELD = new ParseField("shard"); + private static final ParseField START_TIME_FIELD = new ParseField("start_time"); + private static final ParseField IN_NUM_DOCS_RECEIVED_FIELD = new ParseField("in_num_docs_received"); + private static final ParseField OUT_NUM_DOCS_SENT_FIELD = new ParseField("out_num_docs_sent"); + private static final ParseField OUT_NUM_DOCS_INDEXED_FIELD = new ParseField("out_num_docs_indexed"); + private static final ParseField OUT_NUM_DOCS_FAILED_FIELD = new ParseField("out_num_docs_failed"); + + private final ShardId shardId; + private final long rollupStart; + private final long numReceived; + private final long numSent; + private final long numIndexed; + private final long numFailed; + + private static final ConstructingObjectParser PARSER; + static { + PARSER = new ConstructingObjectParser<>( + NAME, + args -> new RollupShardStatus( + ShardId.fromString((String) args[0]), + Instant.parse((String) args[1]).toEpochMilli(), + (Long) args[2], + (Long) args[3], + (Long) args[4], + (Long) args[5] + ) + ); + + PARSER.declareString(constructorArg(), SHARD_FIELD); + PARSER.declareString(constructorArg(), START_TIME_FIELD); + PARSER.declareLong(constructorArg(), IN_NUM_DOCS_RECEIVED_FIELD); + PARSER.declareLong(constructorArg(), OUT_NUM_DOCS_SENT_FIELD); + PARSER.declareLong(constructorArg(), OUT_NUM_DOCS_INDEXED_FIELD); + PARSER.declareLong(constructorArg(), OUT_NUM_DOCS_FAILED_FIELD); + } + + public RollupShardStatus(StreamInput in) throws IOException { + shardId = new ShardId(in); + rollupStart = in.readLong(); + numReceived = in.readLong(); + numSent = in.readLong(); + numIndexed = in.readLong(); + numFailed = in.readLong(); + } + + public RollupShardStatus(ShardId shardId, long rollupStart, long numReceived, long numSent, long numIndexed, long numFailed) { + this.shardId = shardId; + this.rollupStart = rollupStart; + this.numReceived = numReceived; + this.numSent = numSent; + this.numIndexed = numIndexed; + this.numFailed = numFailed; + } + + public static RollupShardStatus fromXContent(XContentParser parser) throws IOException { + return PARSER.parse(parser, null); + } + + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + builder.startObject(); + builder.field(SHARD_FIELD.getPreferredName(), shardId); + builder.field(START_TIME_FIELD.getPreferredName(), Instant.ofEpochMilli(rollupStart).toString()); + builder.field(IN_NUM_DOCS_RECEIVED_FIELD.getPreferredName(), numReceived); + builder.field(OUT_NUM_DOCS_SENT_FIELD.getPreferredName(), numSent); + builder.field(OUT_NUM_DOCS_INDEXED_FIELD.getPreferredName(), numIndexed); + builder.field(OUT_NUM_DOCS_FAILED_FIELD.getPreferredName(), numFailed); + builder.endObject(); + return builder; + } + + @Override + public String getWriteableName() { + return NAME; + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + shardId.writeTo(out); + out.writeLong(rollupStart); + out.writeLong(numReceived); + out.writeLong(numSent); + out.writeLong(numIndexed); + out.writeLong(numFailed); + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + RollupShardStatus that = (RollupShardStatus) o; + return rollupStart == that.rollupStart + && Objects.equals(shardId.getIndexName(), that.shardId.getIndexName()) + && Objects.equals(shardId.id(), that.shardId.id()) + && Objects.equals(numReceived, that.numReceived) + && Objects.equals(numSent, that.numSent) + && Objects.equals(numIndexed, that.numIndexed) + && Objects.equals(numFailed, that.numFailed); + } + + @Override + public int hashCode() { + return Objects.hash(shardId.getIndexName(), shardId.id(), rollupStart, numReceived, numSent, numIndexed, numFailed); + } + + @Override + public String toString() { + return Strings.toString(this); + } +} diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/rollup/action/RollupShardTask.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/rollup/action/RollupShardTask.java new file mode 100644 index 0000000000000..ccf80afd0ae41 --- /dev/null +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/rollup/action/RollupShardTask.java @@ -0,0 +1,90 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.xpack.core.rollup.action; + +import org.elasticsearch.index.shard.ShardId; +import org.elasticsearch.tasks.CancellableTask; +import org.elasticsearch.tasks.TaskId; +import org.elasticsearch.xpack.core.downsample.DownsampleConfig; +import org.elasticsearch.xpack.core.rollup.RollupField; + +import java.util.Map; +import java.util.concurrent.atomic.AtomicLong; + +public class RollupShardTask extends CancellableTask { + private final String rollupIndex; + private final DownsampleConfig config; + private final ShardId shardId; + private final long rollupStartTime; + private final AtomicLong numReceived = new AtomicLong(0); + private final AtomicLong numSent = new AtomicLong(0); + private final AtomicLong numIndexed = new AtomicLong(0); + private final AtomicLong numFailed = new AtomicLong(0); + + public RollupShardTask( + long id, + String type, + String action, + TaskId parentTask, + String rollupIndex, + DownsampleConfig config, + Map headers, + ShardId shardId + ) { + super(id, type, action, RollupField.NAME + "_" + rollupIndex + "[" + shardId.id() + "]", parentTask, headers); + this.rollupIndex = rollupIndex; + this.config = config; + this.shardId = shardId; + this.rollupStartTime = System.currentTimeMillis(); + } + + public String getRollupIndex() { + return rollupIndex; + } + + public DownsampleConfig config() { + return config; + } + + @Override + public Status getStatus() { + return new RollupShardStatus(shardId, rollupStartTime, numReceived.get(), numSent.get(), numIndexed.get(), numFailed.get()); + } + + public long getNumReceived() { + return numReceived.get(); + } + + public long getNumSent() { + return numSent.get(); + } + + public long getNumIndexed() { + return numIndexed.get(); + } + + public long getNumFailed() { + return numFailed.get(); + } + + public void addNumReceived(long count) { + numReceived.addAndGet(count); + } + + public void addNumSent(long count) { + numSent.addAndGet(count); + } + + public void addNumIndexed(long count) { + numIndexed.addAndGet(count); + } + + public void addNumFailed(long count) { + numFailed.addAndGet(count); + } +} diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/rollup/action/RollupShardStatusSerializingTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/rollup/action/RollupShardStatusSerializingTests.java new file mode 100644 index 0000000000000..b1a8357a57c52 --- /dev/null +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/rollup/action/RollupShardStatusSerializingTests.java @@ -0,0 +1,40 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.xpack.core.rollup.action; + +import org.elasticsearch.common.io.stream.Writeable.Reader; +import org.elasticsearch.index.shard.ShardId; +import org.elasticsearch.test.AbstractXContentSerializingTestCase; +import org.elasticsearch.xcontent.XContentParser; + +import java.io.IOException; + +public class RollupShardStatusSerializingTests extends AbstractXContentSerializingTestCase { + @Override + protected RollupShardStatus doParseInstance(XContentParser parser) throws IOException { + return RollupShardStatus.fromXContent(parser); + } + + @Override + protected Reader instanceReader() { + return RollupShardStatus::new; + } + + @Override + protected RollupShardStatus createTestInstance() { + RollupShardStatus rollupShardStatus = new RollupShardStatus( + new ShardId(randomAlphaOfLength(5), randomAlphaOfLength(5), randomInt(5)), + randomMillisUpToYear9999(), + randomNonNegativeLong(), + randomNonNegativeLong(), + randomNonNegativeLong(), + randomNonNegativeLong() + ); + return rollupShardStatus; + } +} diff --git a/x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/downsample/RollupShardIndexer.java b/x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/downsample/RollupShardIndexer.java index 479e0e9cddc1c..c42320132511a 100644 --- a/x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/downsample/RollupShardIndexer.java +++ b/x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/downsample/RollupShardIndexer.java @@ -43,11 +43,13 @@ import org.elasticsearch.search.aggregations.LeafBucketCollector; import org.elasticsearch.search.aggregations.bucket.DocCountProvider; import org.elasticsearch.search.aggregations.support.TimeSeriesIndexSearcher; +import org.elasticsearch.tasks.TaskCancelledException; import org.elasticsearch.xcontent.XContentBuilder; import org.elasticsearch.xcontent.XContentFactory; import org.elasticsearch.xcontent.XContentType; import org.elasticsearch.xpack.core.downsample.DownsampleConfig; import org.elasticsearch.xpack.core.downsample.RollupIndexerAction; +import org.elasticsearch.xpack.core.rollup.action.RollupShardTask; import java.io.Closeable; import java.io.IOException; @@ -57,7 +59,6 @@ import java.util.List; import java.util.Map; import java.util.Objects; -import java.util.concurrent.atomic.AtomicLong; import java.util.function.Function; import java.util.stream.Collectors; @@ -90,11 +91,12 @@ class RollupShardIndexer { private final String[] metricFields; private final String[] labelFields; private final Map fieldValueFetchers; - private final AtomicLong numSent = new AtomicLong(); - private final AtomicLong numIndexed = new AtomicLong(); - private final AtomicLong numFailed = new AtomicLong(); + + private final RollupShardTask task; + private volatile boolean abort = false; RollupShardIndexer( + RollupShardTask task, Client client, IndexService indexService, ShardId shardId, @@ -104,6 +106,7 @@ class RollupShardIndexer { String[] metricFields, String[] labelFields ) { + this.task = task; this.client = client; this.indexShard = indexService.getShard(shardId.id()); this.config = config; @@ -111,7 +114,6 @@ class RollupShardIndexer { this.dimensionFields = dimensionFields; this.metricFields = metricFields; this.labelFields = labelFields; - this.searcher = indexShard.acquireSearcher("downsampling"); Closeable toClose = searcher; try { @@ -137,8 +139,7 @@ public RollupIndexerAction.ShardRollupResponse execute() throws IOException { long startTime = System.currentTimeMillis(); BulkProcessor bulkProcessor = createBulkProcessor(); try (searcher; bulkProcessor) { - // TODO: add cancellations - final TimeSeriesIndexSearcher timeSeriesSearcher = new TimeSeriesIndexSearcher(searcher, List.of()); + final TimeSeriesIndexSearcher timeSeriesSearcher = new TimeSeriesIndexSearcher(searcher, List.of(() -> { checkCancelled(); })); TimeSeriesBucketCollector bucketCollector = new TimeSeriesBucketCollector(bulkProcessor); bucketCollector.preCollection(); timeSeriesSearcher.search(new MatchAllDocsQuery(), bucketCollector); @@ -146,41 +147,60 @@ public RollupIndexerAction.ShardRollupResponse execute() throws IOException { } logger.info( - "Shard [{}] successfully sent [{}], indexed [{}], failed [{}], took [{}]", + "Shard [{}] successfully sent [{}], received source doc [{}], indexed rollup doc [{}], failed [{}], took [{}]", indexShard.shardId(), - numSent.get(), - numIndexed.get(), - numFailed.get(), + task.getNumReceived(), + task.getNumSent(), + task.getNumIndexed(), + task.getNumFailed(), TimeValue.timeValueMillis(System.currentTimeMillis() - startTime) ); - if (numIndexed.get() != numSent.get()) { + if (task.getNumIndexed() != task.getNumSent()) { throw new ElasticsearchException( "Shard [" + indexShard.shardId() + "] failed to index all rollup documents. Sent [" - + numSent.get() + + task.getNumSent() + "], indexed [" - + numIndexed.get() + + task.getNumIndexed() + "]." ); } - return new RollupIndexerAction.ShardRollupResponse(indexShard.shardId(), numIndexed.get()); + + return new RollupIndexerAction.ShardRollupResponse(indexShard.shardId(), task.getNumIndexed()); + } + + private void checkCancelled() { + if (task.isCancelled() || abort) { + logger.warn( + "Shard [{}] rollup abort, sent [{}], indexed [{}], failed[{}]", + indexShard.shardId(), + task.getNumSent(), + task.getNumIndexed(), + task.getNumFailed() + ); + throw new TaskCancelledException(format("Shard %s rollup cancelled", indexShard.shardId())); + } } private BulkProcessor createBulkProcessor() { final BulkProcessor.Listener listener = new BulkProcessor.Listener() { @Override public void beforeBulk(long executionId, BulkRequest request) { - numSent.addAndGet(request.numberOfActions()); + task.addNumSent(request.numberOfActions()); } @Override public void afterBulk(long executionId, BulkRequest request, BulkResponse response) { - numIndexed.addAndGet(request.numberOfActions()); + task.addNumIndexed(request.numberOfActions()); if (response.hasFailures()) { - Map failures = Arrays.stream(response.getItems()) + List failedItems = Arrays.stream(response.getItems()) .filter(BulkItemResponse::isFailed) + .collect(Collectors.toList()); + task.addNumFailed(failedItems.size()); + + Map failures = failedItems.stream() .collect( Collectors.toMap( BulkItemResponse::getId, @@ -188,8 +208,10 @@ public void afterBulk(long executionId, BulkRequest request, BulkResponse respon (msg1, msg2) -> Objects.equals(msg1, msg2) ? msg1 : msg1 + "," + msg2 ) ); - numFailed.addAndGet(failures.size()); logger.error("Shard [{}] failed to populate rollup index. Failures: [{}]", indexShard.shardId(), failures); + + // cancel rollup task + abort = true; } } @@ -197,8 +219,11 @@ public void afterBulk(long executionId, BulkRequest request, BulkResponse respon public void afterBulk(long executionId, BulkRequest request, Throwable failure) { if (failure != null) { long items = request.numberOfActions(); - numFailed.addAndGet(items); + task.addNumFailed(items); logger.error(() -> format("Shard [%s] failed to populate rollup index.", indexShard.shardId()), failure); + + // cancel rollup task + abort = true; } } }; @@ -234,6 +259,7 @@ public LeafBucketCollector getLeafCollector(final AggregationExecutionContext ag return new LeafBucketCollector() { @Override public void collect(int docId, long owningBucketOrd) throws IOException { + task.addNumReceived(1); final BytesRef tsid = aggCtx.getTsid(); assert tsid != null : "Document without [" + TimeSeriesIdFieldMapper.NAME + "] field was found."; final int tsidOrd = aggCtx.getTsidOrd(); @@ -330,7 +356,8 @@ private void indexBucket(XContentBuilder doc) { @Override public void preCollection() throws IOException { - // no-op + // check cancel when start running + checkCancelled(); } @Override @@ -341,6 +368,10 @@ public void postCollection() throws IOException { indexBucket(doc); } bulkProcessor.flush(); + + // check cancel after the flush all data + checkCancelled(); + logger.info("Shard {} processed [{}] docs, created [{}] rollup buckets", indexShard.shardId(), docsProcessed, bucketsCreated); } diff --git a/x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/downsample/TransportRollupIndexerAction.java b/x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/downsample/TransportRollupIndexerAction.java index e7ccf37edde29..be61f99916821 100644 --- a/x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/downsample/TransportRollupIndexerAction.java +++ b/x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/downsample/TransportRollupIndexerAction.java @@ -29,6 +29,7 @@ import org.elasticsearch.transport.TransportService; import org.elasticsearch.xpack.core.ClientHelper; import org.elasticsearch.xpack.core.downsample.RollupIndexerAction; +import org.elasticsearch.xpack.core.rollup.action.RollupShardTask; import java.io.IOException; import java.util.Arrays; @@ -125,6 +126,7 @@ protected RollupIndexerAction.ShardRollupResponse shardOperation(RollupIndexerAc throws IOException { IndexService indexService = indicesService.indexService(request.shardId().getIndex()); RollupShardIndexer indexer = new RollupShardIndexer( + (RollupShardTask) task, client, indexService, request.shardId(), diff --git a/x-pack/plugin/rollup/src/test/java/org/elasticsearch/xpack/downsample/DownsampleActionSingleNodeTests.java b/x-pack/plugin/rollup/src/test/java/org/elasticsearch/xpack/downsample/DownsampleActionSingleNodeTests.java index f10679117a2e9..f4d06397da0fd 100644 --- a/x-pack/plugin/rollup/src/test/java/org/elasticsearch/xpack/downsample/DownsampleActionSingleNodeTests.java +++ b/x-pack/plugin/rollup/src/test/java/org/elasticsearch/xpack/downsample/DownsampleActionSingleNodeTests.java @@ -39,12 +39,14 @@ import org.elasticsearch.index.Index; import org.elasticsearch.index.IndexMode; import org.elasticsearch.index.IndexNotFoundException; +import org.elasticsearch.index.IndexService; import org.elasticsearch.index.IndexSettings; import org.elasticsearch.index.engine.VersionConflictEngineException; import org.elasticsearch.index.mapper.DateFieldMapper; import org.elasticsearch.index.mapper.MapperService; import org.elasticsearch.index.mapper.TimeSeriesIdFieldMapper; import org.elasticsearch.index.mapper.TimeSeriesParams; +import org.elasticsearch.index.shard.IndexShard; import org.elasticsearch.indices.IndicesService; import org.elasticsearch.plugins.Plugin; import org.elasticsearch.search.SearchHit; @@ -65,6 +67,9 @@ import org.elasticsearch.search.aggregations.metrics.ValueCountAggregationBuilder; import org.elasticsearch.search.sort.SortBuilders; import org.elasticsearch.search.sort.SortOrder; +import org.elasticsearch.tasks.TaskCancelHelper; +import org.elasticsearch.tasks.TaskCancelledException; +import org.elasticsearch.tasks.TaskId; import org.elasticsearch.test.ESSingleNodeTestCase; import org.elasticsearch.xcontent.XContentBuilder; import org.elasticsearch.xcontent.XContentFactory; @@ -75,6 +80,7 @@ import org.elasticsearch.xpack.core.ilm.LifecycleSettings; import org.elasticsearch.xpack.core.ilm.RolloverAction; import org.elasticsearch.xpack.core.rollup.ConfigTestHelpers; +import org.elasticsearch.xpack.core.rollup.action.RollupShardTask; import org.elasticsearch.xpack.ilm.IndexLifecycle; import org.elasticsearch.xpack.rollup.Rollup; import org.junit.Before; @@ -96,11 +102,13 @@ import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; +import static java.util.Collections.emptyMap; import static org.elasticsearch.index.mapper.TimeSeriesParams.TIME_SERIES_METRIC_PARAM; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertHitCount; import static org.elasticsearch.xcontent.XContentFactory.jsonBuilder; import static org.hamcrest.Matchers.containsString; +import static org.hamcrest.Matchers.equalTo; public class DownsampleActionSingleNodeTests extends ESSingleNodeTestCase { @@ -571,6 +579,78 @@ public void testRollupDatastream() throws Exception { assertFalse(indices.stream().filter(i -> i.getName().equals(sourceIndex)).toList().isEmpty()); } + public void testCancelRollupIndexer() throws IOException { + // create rollup config and index documents into source index + DownsampleConfig config = new DownsampleConfig(randomInterval()); + SourceSupplier sourceSupplier = () -> XContentFactory.jsonBuilder() + .startObject() + .field(FIELD_TIMESTAMP, randomDateForInterval(config.getInterval())) + .field(FIELD_DIMENSION_1, randomAlphaOfLength(1)) + .field(FIELD_NUMERIC_1, randomDouble()) + .endObject(); + bulkIndex(sourceSupplier); + prepareSourceIndex(sourceIndex); + + IndicesService indexServices = getInstanceFromNode(IndicesService.class); + Index srcIndex = resolveIndex(sourceIndex); + IndexService indexService = indexServices.indexServiceSafe(srcIndex); + int shardNum = randomIntBetween(0, numOfShards - 1); + IndexShard shard = indexService.getShard(shardNum); + RollupShardTask task = new RollupShardTask( + randomLong(), + "rollup", + "action", + TaskId.EMPTY_TASK_ID, + rollupIndex, + config, + emptyMap(), + shard.shardId() + ); + TaskCancelHelper.cancel(task, "test cancel"); + + // re-use source index as temp index for test + RollupShardIndexer indexer = new RollupShardIndexer( + task, + client(), + indexService, + shard.shardId(), + rollupIndex, + config, + new String[] { FIELD_DIMENSION_1, FIELD_DIMENSION_2 }, + new String[] { FIELD_NUMERIC_1, FIELD_NUMERIC_2 }, + new String[] {} + ); + + TaskCancelledException exception = expectThrows(TaskCancelledException.class, () -> indexer.execute()); + assertThat(exception.getMessage(), equalTo("Shard [" + sourceIndex + "][" + shardNum + "] rollup cancelled")); + } + + public void testRollupBulkFailed() throws IOException { + // create rollup config and index documents into source index + DownsampleConfig config = new DownsampleConfig(randomInterval()); + SourceSupplier sourceSupplier = () -> XContentFactory.jsonBuilder() + .startObject() + .field(FIELD_TIMESTAMP, randomDateForInterval(config.getInterval())) + .field(FIELD_DIMENSION_1, randomAlphaOfLength(1)) + .field(FIELD_NUMERIC_1, randomDouble()) + .endObject(); + bulkIndex(sourceSupplier); + prepareSourceIndex(sourceIndex); + + // block rollup index + assertAcked( + client().admin() + .indices() + .preparePutTemplate(rollupIndex) + .setPatterns(List.of(rollupIndex)) + .setSettings(Settings.builder().put("index.blocks.write", "true").build()) + .get() + ); + + ElasticsearchException exception = expectThrows(ElasticsearchException.class, () -> rollup(sourceIndex, rollupIndex, config)); + assertThat(exception.getMessage(), equalTo("Unable to rollup index [" + sourceIndex + "]")); + } + private DateHistogramInterval randomInterval() { return ConfigTestHelpers.randomInterval(); }