Skip to content

TSDB: Downsampling support cancelled #88496

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 19 commits into from
Dec 6, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,7 @@
import org.elasticsearch.xpack.core.rollup.action.GetRollupJobsAction;
import org.elasticsearch.xpack.core.rollup.action.PutRollupJobAction;
import org.elasticsearch.xpack.core.rollup.action.RollupSearchAction;
import org.elasticsearch.xpack.core.rollup.action.RollupShardStatus;
import org.elasticsearch.xpack.core.rollup.action.StartRollupJobAction;
import org.elasticsearch.xpack.core.rollup.action.StopRollupJobAction;
import org.elasticsearch.xpack.core.rollup.job.RollupJob;
Expand Down Expand Up @@ -460,6 +461,7 @@ public List<NamedWriteableRegistry.Entry> getNamedWriteables() {
new NamedWriteableRegistry.Entry(PersistentTaskParams.class, RollupJob.NAME, RollupJob::new),
new NamedWriteableRegistry.Entry(Task.Status.class, RollupJobStatus.NAME, RollupJobStatus::new),
new NamedWriteableRegistry.Entry(PersistentTaskState.class, RollupJobStatus.NAME, RollupJobStatus::new),
new NamedWriteableRegistry.Entry(Task.Status.class, RollupShardStatus.NAME, RollupShardStatus::new),
// ccr
new NamedWriteableRegistry.Entry(AutoFollowMetadata.class, AutoFollowMetadata.TYPE, AutoFollowMetadata::new),
new NamedWriteableRegistry.Entry(Metadata.Custom.class, AutoFollowMetadata.TYPE, AutoFollowMetadata::new),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.elasticsearch.tasks.TaskId;
import org.elasticsearch.xcontent.ToXContentObject;
import org.elasticsearch.xcontent.XContentBuilder;
import org.elasticsearch.xpack.core.rollup.action.RollupShardTask;

import java.io.IOException;
import java.util.Arrays;
Expand Down Expand Up @@ -258,6 +259,20 @@ public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
request.writeTo(out);
}

@Override
public Task createTask(long id, String type, String action, TaskId parentTaskId, Map<String, String> headers) {
return new RollupShardTask(
id,
type,
action,
parentTaskId,
request.rollupRequest.getSourceIndex(),
request.rollupRequest.getDownsampleConfig(),
headers,
shardId()
);
}
}

public static class ShardRollupResponse extends BroadcastShardResponse {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,141 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/

package org.elasticsearch.xpack.core.rollup.action;

import org.elasticsearch.common.Strings;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.xcontent.ConstructingObjectParser;
import org.elasticsearch.xcontent.ParseField;
import org.elasticsearch.xcontent.XContentBuilder;
import org.elasticsearch.xcontent.XContentParser;

import java.io.IOException;
import java.time.Instant;
import java.util.Objects;

import static org.elasticsearch.xcontent.ConstructingObjectParser.constructorArg;

public class RollupShardStatus implements Task.Status {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can the status class be immutable? Typically the task class has modifiable state and when we create the status class it reads fields from the task and then this status class is immutable (an immutable view of the state).

The RollupShardTask should then contain the AtomicInteger counters and the this class should then be passed down to RollupShardIndexer.

For an example you can take a look at reindex, at the BulkByScrollTask and BulkByScrollTask.Status classes.

public static final String NAME = "rollup-index-shard";
private static final ParseField SHARD_FIELD = new ParseField("shard");
private static final ParseField START_TIME_FIELD = new ParseField("start_time");
private static final ParseField IN_NUM_DOCS_RECEIVED_FIELD = new ParseField("in_num_docs_received");
private static final ParseField OUT_NUM_DOCS_SENT_FIELD = new ParseField("out_num_docs_sent");
private static final ParseField OUT_NUM_DOCS_INDEXED_FIELD = new ParseField("out_num_docs_indexed");
private static final ParseField OUT_NUM_DOCS_FAILED_FIELD = new ParseField("out_num_docs_failed");

private final ShardId shardId;
private final long rollupStart;
private final long numReceived;
private final long numSent;
private final long numIndexed;
private final long numFailed;

private static final ConstructingObjectParser<RollupShardStatus, Void> PARSER;
static {
PARSER = new ConstructingObjectParser<>(
NAME,
args -> new RollupShardStatus(
ShardId.fromString((String) args[0]),
Instant.parse((String) args[1]).toEpochMilli(),
(Long) args[2],
(Long) args[3],
(Long) args[4],
(Long) args[5]
)
);

PARSER.declareString(constructorArg(), SHARD_FIELD);
PARSER.declareString(constructorArg(), START_TIME_FIELD);
PARSER.declareLong(constructorArg(), IN_NUM_DOCS_RECEIVED_FIELD);
PARSER.declareLong(constructorArg(), OUT_NUM_DOCS_SENT_FIELD);
PARSER.declareLong(constructorArg(), OUT_NUM_DOCS_INDEXED_FIELD);
PARSER.declareLong(constructorArg(), OUT_NUM_DOCS_FAILED_FIELD);
}

public RollupShardStatus(StreamInput in) throws IOException {
shardId = new ShardId(in);
rollupStart = in.readLong();
numReceived = in.readLong();
numSent = in.readLong();
numIndexed = in.readLong();
numFailed = in.readLong();
}

public RollupShardStatus(ShardId shardId, long rollupStart, long numReceived, long numSent, long numIndexed, long numFailed) {
this.shardId = shardId;
this.rollupStart = rollupStart;
this.numReceived = numReceived;
this.numSent = numSent;
this.numIndexed = numIndexed;
this.numFailed = numFailed;
}

public static RollupShardStatus fromXContent(XContentParser parser) throws IOException {
return PARSER.parse(parser, null);
}

@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject();
builder.field(SHARD_FIELD.getPreferredName(), shardId);
builder.field(START_TIME_FIELD.getPreferredName(), Instant.ofEpochMilli(rollupStart).toString());
builder.field(IN_NUM_DOCS_RECEIVED_FIELD.getPreferredName(), numReceived);
builder.field(OUT_NUM_DOCS_SENT_FIELD.getPreferredName(), numSent);
builder.field(OUT_NUM_DOCS_INDEXED_FIELD.getPreferredName(), numIndexed);
builder.field(OUT_NUM_DOCS_FAILED_FIELD.getPreferredName(), numFailed);
builder.endObject();
return builder;
}

@Override
public String getWriteableName() {
return NAME;
}

@Override
public void writeTo(StreamOutput out) throws IOException {
shardId.writeTo(out);
out.writeLong(rollupStart);
out.writeLong(numReceived);
out.writeLong(numSent);
out.writeLong(numIndexed);
out.writeLong(numFailed);
}

@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
RollupShardStatus that = (RollupShardStatus) o;
return rollupStart == that.rollupStart
&& Objects.equals(shardId.getIndexName(), that.shardId.getIndexName())
&& Objects.equals(shardId.id(), that.shardId.id())
&& Objects.equals(numReceived, that.numReceived)
&& Objects.equals(numSent, that.numSent)
&& Objects.equals(numIndexed, that.numIndexed)
&& Objects.equals(numFailed, that.numFailed);
}

@Override
public int hashCode() {
return Objects.hash(shardId.getIndexName(), shardId.id(), rollupStart, numReceived, numSent, numIndexed, numFailed);
}

@Override
public String toString() {
return Strings.toString(this);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/

package org.elasticsearch.xpack.core.rollup.action;

import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.tasks.CancellableTask;
import org.elasticsearch.tasks.TaskId;
import org.elasticsearch.xpack.core.downsample.DownsampleConfig;
import org.elasticsearch.xpack.core.rollup.RollupField;

import java.util.Map;
import java.util.concurrent.atomic.AtomicLong;

public class RollupShardTask extends CancellableTask {
private final String rollupIndex;
private final DownsampleConfig config;
private final ShardId shardId;
private final long rollupStartTime;
private final AtomicLong numReceived = new AtomicLong(0);
private final AtomicLong numSent = new AtomicLong(0);
private final AtomicLong numIndexed = new AtomicLong(0);
private final AtomicLong numFailed = new AtomicLong(0);

public RollupShardTask(
long id,
String type,
String action,
TaskId parentTask,
String rollupIndex,
DownsampleConfig config,
Map<String, String> headers,
ShardId shardId
) {
super(id, type, action, RollupField.NAME + "_" + rollupIndex + "[" + shardId.id() + "]", parentTask, headers);
this.rollupIndex = rollupIndex;
this.config = config;
this.shardId = shardId;
this.rollupStartTime = System.currentTimeMillis();
}

public String getRollupIndex() {
return rollupIndex;
}

public DownsampleConfig config() {
return config;
}

@Override
public Status getStatus() {
return new RollupShardStatus(shardId, rollupStartTime, numReceived.get(), numSent.get(), numIndexed.get(), numFailed.get());
}

public long getNumReceived() {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe just have getter methods for the AtomicLong fields?

return numReceived.get();
}

public long getNumSent() {
return numSent.get();
}

public long getNumIndexed() {
return numIndexed.get();
}

public long getNumFailed() {
return numFailed.get();
}

public void addNumReceived(long count) {
numReceived.addAndGet(count);
}

public void addNumSent(long count) {
numSent.addAndGet(count);
}

public void addNumIndexed(long count) {
numIndexed.addAndGet(count);
}

public void addNumFailed(long count) {
numFailed.addAndGet(count);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/

package org.elasticsearch.xpack.core.rollup.action;

import org.elasticsearch.common.io.stream.Writeable.Reader;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.test.AbstractXContentSerializingTestCase;
import org.elasticsearch.xcontent.XContentParser;

import java.io.IOException;

public class RollupShardStatusSerializingTests extends AbstractXContentSerializingTestCase<RollupShardStatus> {
@Override
protected RollupShardStatus doParseInstance(XContentParser parser) throws IOException {
return RollupShardStatus.fromXContent(parser);
}

@Override
protected Reader<RollupShardStatus> instanceReader() {
return RollupShardStatus::new;
}

@Override
protected RollupShardStatus createTestInstance() {
RollupShardStatus rollupShardStatus = new RollupShardStatus(
new ShardId(randomAlphaOfLength(5), randomAlphaOfLength(5), randomInt(5)),
randomMillisUpToYear9999(),
randomNonNegativeLong(),
randomNonNegativeLong(),
randomNonNegativeLong(),
randomNonNegativeLong()
);
return rollupShardStatus;
}
}
Loading