Skip to content

Commit d02067d

Browse files
committed
support task cancel
1 parent 74a1e3e commit d02067d

File tree

8 files changed

+398
-15
lines changed

8 files changed

+398
-15
lines changed

x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/XPackClientPlugin.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -153,6 +153,7 @@
153153
import org.elasticsearch.xpack.core.rollup.action.RollupAction;
154154
import org.elasticsearch.xpack.core.rollup.action.RollupIndexerAction;
155155
import org.elasticsearch.xpack.core.rollup.action.RollupSearchAction;
156+
import org.elasticsearch.xpack.core.rollup.action.RollupShardStatus;
156157
import org.elasticsearch.xpack.core.rollup.action.StartRollupJobAction;
157158
import org.elasticsearch.xpack.core.rollup.action.StopRollupJobAction;
158159
import org.elasticsearch.xpack.core.rollup.job.RollupJob;
@@ -474,6 +475,7 @@ public List<NamedWriteableRegistry.Entry> getNamedWriteables() {
474475
new NamedWriteableRegistry.Entry(PersistentTaskParams.class, RollupJob.NAME, RollupJob::new),
475476
new NamedWriteableRegistry.Entry(Task.Status.class, RollupJobStatus.NAME, RollupJobStatus::new),
476477
new NamedWriteableRegistry.Entry(PersistentTaskState.class, RollupJobStatus.NAME, RollupJobStatus::new),
478+
new NamedWriteableRegistry.Entry(Task.Status.class, RollupShardStatus.NAME, RollupShardStatus::new),
477479
// ccr
478480
new NamedWriteableRegistry.Entry(AutoFollowMetadata.class, AutoFollowMetadata.TYPE, AutoFollowMetadata::new),
479481
new NamedWriteableRegistry.Entry(Metadata.Custom.class, AutoFollowMetadata.TYPE, AutoFollowMetadata::new),

x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/rollup/action/RollupIndexerAction.java

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -230,6 +230,20 @@ public void writeTo(StreamOutput out) throws IOException {
230230
super.writeTo(out);
231231
request.writeTo(out);
232232
}
233+
234+
@Override
235+
public Task createTask(long id, String type, String action, TaskId parentTaskId, Map<String, String> headers) {
236+
return new RollupShardTask(
237+
id,
238+
type,
239+
action,
240+
parentTaskId,
241+
request.rollupRequest.getRollupIndex(),
242+
request.rollupRequest.getRollupConfig(),
243+
headers,
244+
shardId()
245+
);
246+
}
233247
}
234248

235249
public static class ShardRollupResponse extends BroadcastShardResponse {
Lines changed: 207 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,207 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License
4+
* 2.0; you may not use this file except in compliance with the Elastic License
5+
* 2.0.
6+
*/
7+
8+
package org.elasticsearch.xpack.core.rollup.action;
9+
10+
import org.elasticsearch.common.Strings;
11+
import org.elasticsearch.common.io.stream.StreamInput;
12+
import org.elasticsearch.common.io.stream.StreamOutput;
13+
import org.elasticsearch.index.shard.ShardId;
14+
import org.elasticsearch.tasks.Task;
15+
import org.elasticsearch.xcontent.ConstructingObjectParser;
16+
import org.elasticsearch.xcontent.ParseField;
17+
import org.elasticsearch.xcontent.XContentBuilder;
18+
import org.elasticsearch.xcontent.XContentParser;
19+
20+
import java.io.IOException;
21+
import java.time.Instant;
22+
import java.util.Objects;
23+
import java.util.concurrent.atomic.AtomicLong;
24+
25+
import static org.elasticsearch.xcontent.ConstructingObjectParser.constructorArg;
26+
27+
public class RollupShardStatus implements Task.Status {
28+
public static final String NAME = "rollup-index-shard";
29+
private static final ParseField SHARD_FIELD = new ParseField("shard");
30+
private static final ParseField STATUS_FIELD = new ParseField("status");
31+
private static final ParseField START_TIME_FIELD = new ParseField("start_time");
32+
private static final ParseField IN_NUM_DOCS_RECEIVED_FIELD = new ParseField("in_num_docs_received");
33+
private static final ParseField OUT_NUM_DOCS_SENT_FIELD = new ParseField("out_num_docs_sent");
34+
private static final ParseField OUT_NUM_DOCS_INDEXED_FIELD = new ParseField("out_num_docs_indexed");
35+
private static final ParseField OUT_NUM_DOCS_FAILED_FIELD = new ParseField("out_num_docs_failed");
36+
37+
private final ShardId shardId;
38+
private final long rollupStart;
39+
private Status status;
40+
private AtomicLong numReceived = new AtomicLong(0);
41+
private AtomicLong numSent = new AtomicLong(0);
42+
private AtomicLong numIndexed = new AtomicLong(0);
43+
private AtomicLong numFailed = new AtomicLong(0);
44+
45+
private static final ConstructingObjectParser<RollupShardStatus, Void> PARSER;
46+
static {
47+
PARSER = new ConstructingObjectParser<>(
48+
NAME,
49+
args -> new RollupShardStatus(
50+
ShardId.fromString((String) args[0]),
51+
Status.valueOf((String) args[1]),
52+
Instant.parse((String) args[2]).toEpochMilli(),
53+
new AtomicLong((Long) args[3]),
54+
new AtomicLong((Long) args[4]),
55+
new AtomicLong((Long) args[5]),
56+
new AtomicLong((Long) args[6])
57+
)
58+
);
59+
60+
PARSER.declareString(constructorArg(), SHARD_FIELD);
61+
PARSER.declareString(constructorArg(), STATUS_FIELD);
62+
PARSER.declareString(constructorArg(), START_TIME_FIELD);
63+
PARSER.declareLong(constructorArg(), IN_NUM_DOCS_RECEIVED_FIELD);
64+
PARSER.declareLong(constructorArg(), OUT_NUM_DOCS_SENT_FIELD);
65+
PARSER.declareLong(constructorArg(), OUT_NUM_DOCS_INDEXED_FIELD);
66+
PARSER.declareLong(constructorArg(), OUT_NUM_DOCS_FAILED_FIELD);
67+
}
68+
69+
public RollupShardStatus(StreamInput in) throws IOException {
70+
shardId = new ShardId(in);
71+
status = in.readEnum(Status.class);
72+
rollupStart = in.readLong();
73+
numReceived = new AtomicLong(in.readLong());
74+
numSent = new AtomicLong(in.readLong());
75+
numIndexed = new AtomicLong(in.readLong());
76+
numFailed = new AtomicLong(in.readLong());
77+
}
78+
79+
public RollupShardStatus(
80+
ShardId shardId,
81+
Status status,
82+
long rollupStart,
83+
AtomicLong numReceived,
84+
AtomicLong numSent,
85+
AtomicLong numIndexed,
86+
AtomicLong numFailed
87+
) {
88+
this.shardId = shardId;
89+
this.status = status;
90+
this.rollupStart = rollupStart;
91+
this.numReceived = numReceived;
92+
this.numSent = numSent;
93+
this.numIndexed = numIndexed;
94+
this.numFailed = numFailed;
95+
}
96+
97+
public RollupShardStatus(ShardId shardId) {
98+
status = Status.STARTED;
99+
this.shardId = shardId;
100+
this.rollupStart = System.currentTimeMillis();
101+
}
102+
103+
public void init(AtomicLong numReceived, AtomicLong numSent, AtomicLong numIndexed, AtomicLong numFailed) {
104+
this.numReceived = numReceived;
105+
this.numSent = numSent;
106+
this.numIndexed = numIndexed;
107+
this.numFailed = numFailed;
108+
}
109+
110+
public Status getStatus() {
111+
return status;
112+
}
113+
114+
public void setStatus(Status status) {
115+
this.status = status;
116+
}
117+
118+
public static RollupShardStatus fromXContent(XContentParser parser) throws IOException {
119+
return PARSER.parse(parser, null);
120+
}
121+
122+
@Override
123+
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
124+
builder.startObject();
125+
builder.field(SHARD_FIELD.getPreferredName(), shardId);
126+
builder.field(STATUS_FIELD.getPreferredName(), status);
127+
builder.field(START_TIME_FIELD.getPreferredName(), Instant.ofEpochMilli(rollupStart).toString());
128+
builder.field(IN_NUM_DOCS_RECEIVED_FIELD.getPreferredName(), numReceived.get());
129+
builder.field(OUT_NUM_DOCS_SENT_FIELD.getPreferredName(), numSent.get());
130+
builder.field(OUT_NUM_DOCS_INDEXED_FIELD.getPreferredName(), numIndexed.get());
131+
builder.field(OUT_NUM_DOCS_FAILED_FIELD.getPreferredName(), numFailed.get());
132+
builder.endObject();
133+
return builder;
134+
}
135+
136+
@Override
137+
public String getWriteableName() {
138+
return NAME;
139+
}
140+
141+
@Override
142+
public void writeTo(StreamOutput out) throws IOException {
143+
shardId.writeTo(out);
144+
out.writeEnum(status);
145+
out.writeLong(rollupStart);
146+
out.writeLong(numReceived.get());
147+
out.writeLong(numSent.get());
148+
out.writeLong(numIndexed.get());
149+
out.writeLong(numFailed.get());
150+
}
151+
152+
@Override
153+
public boolean equals(Object o) {
154+
if (this == o) {
155+
return true;
156+
}
157+
if (o == null || getClass() != o.getClass()) {
158+
return false;
159+
}
160+
RollupShardStatus that = (RollupShardStatus) o;
161+
return rollupStart == that.rollupStart
162+
&& Objects.equals(shardId.getIndexName(), that.shardId.getIndexName())
163+
&& Objects.equals(shardId.id(), that.shardId.id())
164+
&& status == that.status
165+
&& Objects.equals(numReceived.get(), that.numReceived.get())
166+
&& Objects.equals(numSent.get(), that.numSent.get())
167+
&& Objects.equals(numIndexed.get(), that.numIndexed.get())
168+
&& Objects.equals(numFailed.get(), that.numFailed.get());
169+
}
170+
171+
@Override
172+
public int hashCode() {
173+
return Objects.hash(
174+
shardId.getIndexName(),
175+
shardId.id(),
176+
rollupStart,
177+
status,
178+
numReceived.get(),
179+
numSent.get(),
180+
numIndexed.get(),
181+
numFailed.get()
182+
);
183+
}
184+
185+
@Override
186+
public String toString() {
187+
return Strings.toString(this);
188+
}
189+
190+
public enum Status {
191+
STARTED,
192+
FINISHED,
193+
ABORT
194+
}
195+
196+
public void setNumSent(AtomicLong numSent) {
197+
this.numSent = numSent;
198+
}
199+
200+
public void setNumIndexed(AtomicLong numIndexed) {
201+
this.numIndexed = numIndexed;
202+
}
203+
204+
public void setNumFailed(AtomicLong numFailed) {
205+
this.numFailed = numFailed;
206+
}
207+
}
Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,56 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License
4+
* 2.0; you may not use this file except in compliance with the Elastic License
5+
* 2.0.
6+
*/
7+
8+
package org.elasticsearch.xpack.core.rollup.action;
9+
10+
import java.util.Map;
11+
12+
import org.elasticsearch.index.shard.ShardId;
13+
import org.elasticsearch.tasks.CancellableTask;
14+
import org.elasticsearch.tasks.TaskId;
15+
import org.elasticsearch.xpack.core.rollup.RollupActionConfig;
16+
import org.elasticsearch.xpack.core.rollup.RollupField;
17+
18+
public class RollupShardTask extends CancellableTask {
19+
private String rollupIndex;
20+
private RollupActionConfig config;
21+
private volatile RollupShardStatus status;
22+
23+
public RollupShardTask(
24+
long id,
25+
String type,
26+
String action,
27+
TaskId parentTask,
28+
String rollupIndex,
29+
RollupActionConfig config,
30+
Map<String, String> headers,
31+
ShardId shardId
32+
) {
33+
super(id, type, action, RollupField.NAME + "_" + rollupIndex + "[" + shardId.id() + "]", parentTask, headers);
34+
this.rollupIndex = rollupIndex;
35+
this.config = config;
36+
this.status = new RollupShardStatus(shardId);
37+
}
38+
39+
public String getRollupIndex() {
40+
return rollupIndex;
41+
}
42+
43+
public RollupActionConfig config() {
44+
return config;
45+
}
46+
47+
@Override
48+
public Status getStatus() {
49+
return status;
50+
}
51+
52+
@Override
53+
public void onCancelled() {
54+
status.setStatus(RollupShardStatus.Status.ABORT);
55+
}
56+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License
4+
* 2.0; you may not use this file except in compliance with the Elastic License
5+
* 2.0.
6+
*/
7+
8+
package org.elasticsearch.xpack.core.rollup.action;
9+
10+
import org.elasticsearch.common.io.stream.Writeable.Reader;
11+
import org.elasticsearch.index.shard.ShardId;
12+
import org.elasticsearch.test.AbstractSerializingTestCase;
13+
import org.elasticsearch.xcontent.XContentParser;
14+
15+
import java.io.IOException;
16+
import java.util.concurrent.atomic.AtomicLong;
17+
18+
public class RollupShardStatusSerializingTests extends AbstractSerializingTestCase<RollupShardStatus> {
19+
@Override
20+
protected RollupShardStatus doParseInstance(XContentParser parser) throws IOException {
21+
return RollupShardStatus.fromXContent(parser);
22+
}
23+
24+
@Override
25+
protected Reader<RollupShardStatus> instanceReader() {
26+
return RollupShardStatus::new;
27+
}
28+
29+
@Override
30+
protected RollupShardStatus createTestInstance() {
31+
RollupShardStatus rollupShardStatus = new RollupShardStatus(
32+
new ShardId(randomAlphaOfLength(5), randomAlphaOfLength(5), randomInt(5))
33+
);
34+
rollupShardStatus.init(
35+
new AtomicLong(randomNonNegativeLong()),
36+
new AtomicLong(randomNonNegativeLong()),
37+
new AtomicLong(randomNonNegativeLong()),
38+
new AtomicLong(randomNonNegativeLong())
39+
);
40+
return rollupShardStatus;
41+
}
42+
}

0 commit comments

Comments
 (0)