Skip to content

[RollupV2]: make RollupAction available and improve some features #82944

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 10 commits into from
Closed
6 changes: 6 additions & 0 deletions docs/changelog/82944.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
pr: 82944
summary: [RollupV2]: make RollupAction available and improve some features
area: Rollup
type: feature
issues:
- 42720
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@
import java.util.Map;
import java.util.Objects;

import static org.elasticsearch.action.ValidateActions.addValidationError;

public class RollupAction extends ActionType<AcknowledgedResponse> {
public static final RollupAction INSTANCE = new RollupAction();
public static final String NAME = "indices:admin/xpack/rollup";
Expand Down Expand Up @@ -89,9 +91,23 @@ public RollupActionConfig getRollupConfig() {
return rollupConfig;
}

public void setRollupConfig(RollupActionConfig rollupConfig) {
this.rollupConfig = rollupConfig;
}

@Override
public ActionRequestValidationException validate() {
return null;
ActionRequestValidationException validationException = null;
if (sourceIndex == null) {
validationException = addValidationError("rollup origin index is missing", null);
}
if (rollupIndex == null) {
validationException = addValidationError("rollup index is missing", validationException);
}
if (rollupConfig == null) {
validationException = addValidationError("rollup config is missing", validationException);
}
return validationException;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,20 @@ public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
request.writeTo(out);
}

@Override
public Task createTask(long id, String type, String action, TaskId parentTaskId, Map<String, String> headers) {
return new RollupShardTask(
id,
type,
action,
parentTaskId,
request.rollupRequest.getRollupIndex(),
request.rollupRequest.getRollupConfig(),
headers,
shardId()
);
}
}

public static class ShardResponse extends BroadcastShardResponse {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,220 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/

package org.elasticsearch.xpack.core.rollup.action;

import org.elasticsearch.common.Strings;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.xcontent.ConstructingObjectParser;
import org.elasticsearch.xcontent.ParseField;
import org.elasticsearch.xcontent.XContentBuilder;
import org.elasticsearch.xcontent.XContentParser;

import java.io.IOException;
import java.time.Instant;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicLong;

import static org.elasticsearch.xcontent.ConstructingObjectParser.constructorArg;

public class RollupShardStatus implements Task.Status {
public static final String NAME = "rollup-index-shard";
private static final ParseField SHARD_FIELD = new ParseField("shard");
private static final ParseField STATUS_FIELD = new ParseField("status");
private static final ParseField START_TIME_FIELD = new ParseField("start_time");
private static final ParseField IN_NUM_DOCS_RECEIVED_FIELD = new ParseField("in_num_docs_received");
private static final ParseField IN_NUM_DOCS_SKIPPED_FIELD = new ParseField("in_num_docs_skipped");
private static final ParseField OUT_NUM_DOCS_SENT_FIELD = new ParseField("out_num_docs_sent");
private static final ParseField OUT_NUM_DOCS_INDEXED_FIELD = new ParseField("out_num_docs_indexed");
private static final ParseField OUT_NUM_DOCS_FAILED_FIELD = new ParseField("out_num_docs_failed");

private final ShardId shardId;
private final long rollupStart;
private Status status;
private AtomicLong numReceived = new AtomicLong(0);
private AtomicLong numSkip = new AtomicLong(0);
private AtomicLong numSent = new AtomicLong(0);
private AtomicLong numIndexed = new AtomicLong(0);
private AtomicLong numFailed = new AtomicLong(0);

private static final ConstructingObjectParser<RollupShardStatus, Void> PARSER;
static {
PARSER = new ConstructingObjectParser<>(
NAME,
args -> new RollupShardStatus(
ShardId.fromString((String) args[0]),
Status.valueOf((String) args[1]),
Instant.parse((String) args[2]).toEpochMilli(),
new AtomicLong((Long) args[3]),
new AtomicLong((Long) args[4]),
new AtomicLong((Long) args[5]),
new AtomicLong((Long) args[6]),
new AtomicLong((Long) args[7])
)
);

PARSER.declareString(constructorArg(), SHARD_FIELD);
PARSER.declareString(constructorArg(), STATUS_FIELD);
PARSER.declareString(constructorArg(), START_TIME_FIELD);
PARSER.declareLong(constructorArg(), IN_NUM_DOCS_RECEIVED_FIELD);
PARSER.declareLong(constructorArg(), IN_NUM_DOCS_SKIPPED_FIELD);
PARSER.declareLong(constructorArg(), OUT_NUM_DOCS_SENT_FIELD);
PARSER.declareLong(constructorArg(), OUT_NUM_DOCS_INDEXED_FIELD);
PARSER.declareLong(constructorArg(), OUT_NUM_DOCS_FAILED_FIELD);
}

public RollupShardStatus(StreamInput in) throws IOException {
shardId = new ShardId(in);
status = in.readEnum(Status.class);
rollupStart = in.readLong();
numReceived = new AtomicLong(in.readLong());
numSkip = new AtomicLong(in.readLong());
numSent = new AtomicLong(in.readLong());
numIndexed = new AtomicLong(in.readLong());
numFailed = new AtomicLong(in.readLong());
}

public RollupShardStatus(
ShardId shardId,
Status status,
long rollupStart,
AtomicLong numReceived,
AtomicLong numSkip,
AtomicLong numSent,
AtomicLong numIndexed,
AtomicLong numFailed
) {
this.shardId = shardId;
this.status = status;
this.rollupStart = rollupStart;
this.numReceived = numReceived;
this.numSkip = numSkip;
this.numSent = numSent;
this.numIndexed = numIndexed;
this.numFailed = numFailed;
}

public RollupShardStatus(ShardId shardId) {
status = Status.INIT;
this.shardId = shardId;
this.rollupStart = System.currentTimeMillis();
}

public void init(AtomicLong numReceived, AtomicLong numSkip, AtomicLong numSent, AtomicLong numIndexed, AtomicLong numFailed) {
this.numReceived = numReceived;
this.numSkip = numSkip;
this.numSent = numSent;
this.numIndexed = numIndexed;
this.numFailed = numFailed;
}

public Status getStatus() {
return status;
}

public void setStatus(Status status) {
this.status = status;
}

public static RollupShardStatus fromXContent(XContentParser parser) throws IOException {
return PARSER.parse(parser, null);
}

@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject();
builder.field(SHARD_FIELD.getPreferredName(), shardId);
builder.field(STATUS_FIELD.getPreferredName(), status);
builder.field(START_TIME_FIELD.getPreferredName(), Instant.ofEpochMilli(rollupStart).toString());
builder.field(IN_NUM_DOCS_RECEIVED_FIELD.getPreferredName(), numReceived.get());
builder.field(IN_NUM_DOCS_SKIPPED_FIELD.getPreferredName(), numSkip.get());
builder.field(OUT_NUM_DOCS_SENT_FIELD.getPreferredName(), numSent.get());
builder.field(OUT_NUM_DOCS_INDEXED_FIELD.getPreferredName(), numIndexed.get());
builder.field(OUT_NUM_DOCS_FAILED_FIELD.getPreferredName(), numFailed.get());
builder.endObject();
return builder;
}

@Override
public String getWriteableName() {
return NAME;
}

@Override
public void writeTo(StreamOutput out) throws IOException {
shardId.writeTo(out);
out.writeEnum(status);
out.writeLong(rollupStart);
out.writeLong(numReceived.get());
out.writeLong(numSkip.get());
out.writeLong(numSent.get());
out.writeLong(numIndexed.get());
out.writeLong(numFailed.get());
}

@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
RollupShardStatus that = (RollupShardStatus) o;
return rollupStart == that.rollupStart
&& Objects.equals(shardId.getIndexName(), that.shardId.getIndexName())
&& Objects.equals(shardId.id(), that.shardId.id())
&& status == that.status
&& Objects.equals(numReceived.get(), that.numReceived.get())
&& Objects.equals(numSkip.get(), that.numSkip.get())
&& Objects.equals(numSent.get(), that.numSent.get())
&& Objects.equals(numIndexed.get(), that.numIndexed.get())
&& Objects.equals(numFailed.get(), that.numFailed.get());
}

@Override
public int hashCode() {
return Objects.hash(
shardId.getIndexName(),
shardId.id(),
rollupStart,
status,
numReceived.get(),
numSkip.get(),
numSent.get(),
numIndexed.get(),
numFailed.get()
);
}

@Override
public String toString() {
return Strings.toString(this);
}

public enum Status {
INIT,
ROLLING,
STOP,
ABORT
}

public void setNumSent(AtomicLong numSent) {
this.numSent = numSent;
}

public void setNumIndexed(AtomicLong numIndexed) {
this.numIndexed = numIndexed;
}

public void setNumFailed(AtomicLong numFailed) {
this.numFailed = numFailed;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/

package org.elasticsearch.xpack.core.rollup.action;

import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.tasks.CancellableTask;
import org.elasticsearch.tasks.TaskId;
import org.elasticsearch.xpack.core.rollup.RollupActionConfig;
import org.elasticsearch.xpack.core.rollup.RollupField;

import java.util.Map;

public class RollupShardTask extends CancellableTask {
private String rollupIndex;
private RollupActionConfig config;
private volatile RollupShardStatus status;

public RollupShardTask(
long id,
String type,
String action,
TaskId parentTask,
String rollupIndex,
RollupActionConfig config,
Map<String, String> headers,
ShardId shardId
) {
super(id, type, action, RollupField.NAME + "_" + rollupIndex + "[" + shardId.id() + "]", parentTask, headers);
this.rollupIndex = rollupIndex;
this.config = config;
this.status = new RollupShardStatus(shardId);
}

public String getRollupIndex() {
return rollupIndex;
}

public RollupActionConfig config() {
return config;
}

@Override
public Status getStatus() {
return status;
}

@Override
public void onCancelled() {
status.setStatus(RollupShardStatus.Status.ABORT);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.index.mapper.KeywordFieldMapper;
import org.elasticsearch.index.mapper.TextFieldMapper;
import org.elasticsearch.index.mapper.TimeSeriesIdFieldMapper;
import org.elasticsearch.xcontent.ConstructingObjectParser;
import org.elasticsearch.xcontent.ParseField;
import org.elasticsearch.xcontent.ToXContentObject;
Expand Down Expand Up @@ -85,7 +86,9 @@ public void validateMappings(
Map<String, FieldCapabilities> fieldCaps = fieldCapsResponse.get(field);
if (fieldCaps != null && fieldCaps.isEmpty() == false) {
fieldCaps.forEach((key, value) -> {
if (key.equals(KeywordFieldMapper.CONTENT_TYPE) || key.equals(TextFieldMapper.CONTENT_TYPE)) {
if (key.equals(KeywordFieldMapper.CONTENT_TYPE)
|| key.equals(TextFieldMapper.CONTENT_TYPE)
|| key.equals(TimeSeriesIdFieldMapper.CONTENT_TYPE)) {
if (value.isAggregatable() == false) {
validationException.addValidationError(
"The field [" + field + "] must be aggregatable across all indices, " + "but is not."
Expand Down
Loading