Skip to content

Commit 9c8143f

Browse files
authored
Store reindexing result in reindex index (#45260)
Currently the result of a reindex persistent task is propogated and stored in the cluster state. This commit changes this so that only the ephemeral task-id, headers, and reindex state is store in the cluster state. Any result (exception or response) is stored in the reindex index. Relates to #42612.
1 parent fcaa334 commit 9c8143f

File tree

9 files changed

+389
-220
lines changed

9 files changed

+389
-220
lines changed

modules/reindex/src/main/java/org/elasticsearch/index/reindex/ReindexJobState.java

+23-39
Original file line numberDiff line numberDiff line change
@@ -19,8 +19,6 @@
1919

2020
package org.elasticsearch.index.reindex;
2121

22-
import org.elasticsearch.ElasticsearchException;
23-
import org.elasticsearch.common.Nullable;
2422
import org.elasticsearch.common.ParseField;
2523
import org.elasticsearch.common.io.stream.StreamInput;
2624
import org.elasticsearch.common.io.stream.StreamOutput;
@@ -39,41 +37,32 @@ public class ReindexJobState implements Task.Status, PersistentTaskState {
3937
public static final String NAME = ReindexTask.NAME;
4038

4139
public static final ConstructingObjectParser<ReindexJobState, Void> PARSER =
42-
new ConstructingObjectParser<>(NAME, a -> new ReindexJobState((String) a[0], (BulkByScrollResponse) a[1],
43-
(ElasticsearchException) a[2]));
40+
new ConstructingObjectParser<>(NAME, a -> new ReindexJobState((String) a[0], (String) a[1]));
4441

4542
private static String EPHEMERAL_TASK_ID = "ephemeral_task_id";
46-
private static String REINDEX_RESPONSE = "reindex_response";
47-
private static String REINDEX_EXCEPTION = "reindex_exception";
43+
private static String STATUS = "status";
4844

4945
static {
5046
PARSER.declareString(ConstructingObjectParser.constructorArg(), new ParseField(EPHEMERAL_TASK_ID));
51-
PARSER.declareObject(ConstructingObjectParser.optionalConstructorArg(), (p, c) -> BulkByScrollResponse.fromXContent(p),
52-
new ParseField(REINDEX_RESPONSE));
53-
PARSER.declareObject(ConstructingObjectParser.optionalConstructorArg(), (p, c) -> ElasticsearchException.fromXContent(p),
54-
new ParseField(REINDEX_EXCEPTION));
47+
PARSER.declareString(ConstructingObjectParser.constructorArg(), new ParseField(STATUS));
5548
}
5649

5750
private final TaskId ephemeralTaskId;
58-
private final BulkByScrollResponse reindexResponse;
59-
private final ElasticsearchException jobException;
51+
private final Status status;
6052

61-
private ReindexJobState(String ephemeralTaskId, BulkByScrollResponse reindexResponse, ElasticsearchException jobException) {
62-
this(new TaskId(ephemeralTaskId), reindexResponse, jobException);
53+
private ReindexJobState(String ephemeralTaskId, String status) {
54+
this(new TaskId(ephemeralTaskId), Status.valueOf(status));
6355
}
6456

65-
ReindexJobState(TaskId ephemeralTaskId, @Nullable BulkByScrollResponse reindexResponse,
66-
@Nullable ElasticsearchException jobException) {
57+
ReindexJobState(TaskId ephemeralTaskId, Status status) {
58+
assert status != null : "Status cannot be null";
6759
this.ephemeralTaskId = ephemeralTaskId;
68-
assert (reindexResponse == null) || (jobException == null) : "Either response or exception must be null";
69-
this.reindexResponse = reindexResponse;
70-
this.jobException = jobException;
60+
this.status = status;
7161
}
7262

7363
public ReindexJobState(StreamInput in) throws IOException {
7464
ephemeralTaskId = TaskId.readFromStream(in);
75-
reindexResponse = in.readOptionalWriteable(BulkByScrollResponse::new);
76-
jobException = in.readException();
65+
status = in.readEnum(Status.class);
7766
}
7867

7968
@Override
@@ -84,35 +73,23 @@ public String getWriteableName() {
8473
@Override
8574
public void writeTo(StreamOutput out) throws IOException {
8675
ephemeralTaskId.writeTo(out);
87-
out.writeOptionalWriteable(reindexResponse);
88-
out.writeException(jobException);
76+
out.writeEnum(status);
8977
}
9078

9179
@Override
9280
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
9381
builder.startObject();
9482
builder.field(EPHEMERAL_TASK_ID, ephemeralTaskId.toString());
95-
if (reindexResponse != null) {
96-
builder.field(REINDEX_RESPONSE);
97-
builder.startObject();
98-
reindexResponse.toXContent(builder, params);
99-
builder.endObject();
100-
}
101-
if (jobException != null) {
102-
builder.field(REINDEX_EXCEPTION);
103-
builder.startObject();
104-
jobException.toXContent(builder, params);
105-
builder.endObject();
106-
}
83+
builder.field(STATUS, status);
10784
return builder.endObject();
10885
}
10986

110-
public BulkByScrollResponse getReindexResponse() {
111-
return reindexResponse;
87+
public boolean isDone() {
88+
return status != Status.STARTED;
11289
}
11390

114-
public ElasticsearchException getJobException() {
115-
return jobException;
91+
public Status getStatus() {
92+
return status;
11693
}
11794

11895
public TaskId getEphemeralTaskId() {
@@ -122,4 +99,11 @@ public TaskId getEphemeralTaskId() {
12299
public static ReindexJobState fromXContent(XContentParser parser) {
123100
return PARSER.apply(parser, null);
124101
}
102+
103+
public enum Status {
104+
STARTED,
105+
FAILED_TO_READ_FROM_REINDEX_INDEX,
106+
FAILED_TO_WRITE_TO_REINDEX_INDEX,
107+
DONE
108+
}
125109
}

0 commit comments

Comments
 (0)