Skip to content

Commit 33d8fd4

Browse files
How could I....
1 parent c06dc7a commit 33d8fd4

File tree

6 files changed

+44
-56
lines changed

6 files changed

+44
-56
lines changed

modules/reindex/src/main/java/org/elasticsearch/index/reindex/ReindexTaskParams.java

+2-16
Original file line numberDiff line numberDiff line change
@@ -29,8 +29,6 @@
2929
import org.elasticsearch.persistent.PersistentTaskParams;
3030

3131
import java.io.IOException;
32-
import java.util.Collection;
33-
import java.util.List;
3432
import java.util.Map;
3533

3634
public class ReindexTaskParams implements PersistentTaskParams {
@@ -39,32 +37,26 @@ public class ReindexTaskParams implements PersistentTaskParams {
3937

4038
@SuppressWarnings("unchecked")
4139
public static final ConstructingObjectParser<ReindexTaskParams, Void> PARSER
42-
= new ConstructingObjectParser<>(NAME, a -> new ReindexTaskParams((Boolean) a[0], (List<List<String>>) a[1],
43-
(Map<String, String>) a[2]));
40+
= new ConstructingObjectParser<>(NAME, a -> new ReindexTaskParams((Boolean) a[0], (Map<String, String>) a[1]));
4441

4542
private static String STORE_RESULT = "store_result";
4643
private static String HEADERS = "headers";
47-
private static String INDEX_GROUPS = "index_groups";
4844

4945
static {
5046
PARSER.declareBoolean(ConstructingObjectParser.constructorArg(), new ParseField(STORE_RESULT));
51-
PARSER.declareObjectArray(ConstructingObjectParser.constructorArg(), (p, c) -> p.list(), new ParseField(INDEX_GROUPS));
5247
PARSER.declareObject(ConstructingObjectParser.constructorArg(), (p, c) -> p.mapStrings(), new ParseField(HEADERS));
5348
}
5449

5550
private final boolean storeResult;
56-
private final List<? extends Collection<String>> indexGroups;
5751
private final Map<String, String> headers;
5852

59-
public ReindexTaskParams(boolean storeResult, List<? extends Collection<String>> indexGroups, Map<String, String> headers) {
53+
public ReindexTaskParams(boolean storeResult, Map<String, String> headers) {
6054
this.storeResult = storeResult;
61-
this.indexGroups = indexGroups;
6255
this.headers = headers;
6356
}
6457

6558
public ReindexTaskParams(StreamInput in) throws IOException {
6659
storeResult = in.readBoolean();
67-
indexGroups = in.readList(StreamInput::readStringList);
6860
headers = in.readMap(StreamInput::readString, StreamInput::readString);
6961
}
7062

@@ -82,15 +74,13 @@ public Version getMinimalSupportedVersion() {
8274
@Override
8375
public void writeTo(StreamOutput out) throws IOException {
8476
out.writeBoolean(storeResult);
85-
out.writeCollection(indexGroups, StreamOutput::writeStringCollection);
8677
out.writeMap(headers, StreamOutput::writeString, StreamOutput::writeString);
8778
}
8879

8980
@Override
9081
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
9182
builder.startObject();
9283
builder.field(STORE_RESULT, storeResult);
93-
builder.field(INDEX_GROUPS, indexGroups);
9484
builder.field(HEADERS, headers);
9585
return builder.endObject();
9686
}
@@ -99,10 +89,6 @@ public boolean shouldStoreResult() {
9989
return storeResult;
10090
}
10191

102-
public List<? extends Collection<String>> getIndexGroups() {
103-
return indexGroups;
104-
}
105-
10692
public Map<String, String> getHeaders() {
10793
return headers;
10894
}

modules/reindex/src/main/java/org/elasticsearch/index/reindex/ReindexTaskStateDoc.java

+29-12
Original file line numberDiff line numberDiff line change
@@ -29,14 +29,20 @@
2929
import org.elasticsearch.rest.RestStatus;
3030

3131
import java.io.IOException;
32+
import java.util.Collection;
33+
import java.util.List;
34+
import java.util.Set;
3235

3336
public class ReindexTaskStateDoc implements ToXContentObject {
3437

38+
@SuppressWarnings("unchecked")
3539
public static final ConstructingObjectParser<ReindexTaskStateDoc, Void> PARSER =
36-
new ConstructingObjectParser<>("reindex/index_state", a -> new ReindexTaskStateDoc((ReindexRequest) a[0], (Long) a[1],
37-
(BulkByScrollResponse) a[2], (ElasticsearchException) a[3], (Integer) a[4], (ScrollableHitSource.Checkpoint) a[5]));
40+
new ConstructingObjectParser<>("reindex/index_state", a -> new ReindexTaskStateDoc((ReindexRequest) a[0], (List<Set<String>>) a[1],
41+
(Long) a[2], (BulkByScrollResponse) a[3], (ElasticsearchException) a[4], (Integer) a[5],
42+
(ScrollableHitSource.Checkpoint) a[6]));
3843

3944
private static final String REINDEX_REQUEST = "request";
45+
private static final String INDEX_GROUPS = "index_groups";
4046
private static final String ALLOCATION = "allocation";
4147
private static final String REINDEX_RESPONSE = "response";
4248
private static final String REINDEX_EXCEPTION = "exception";
@@ -46,6 +52,7 @@ public class ReindexTaskStateDoc implements ToXContentObject {
4652
static {
4753
PARSER.declareObject(ConstructingObjectParser.constructorArg(), (p, c) -> ReindexRequest.fromXContentWithParams(p),
4854
new ParseField(REINDEX_REQUEST));
55+
PARSER.declareObjectArray(ConstructingObjectParser.constructorArg(), (p, c) -> p.list(), new ParseField(INDEX_GROUPS));
4956
PARSER.declareLong(ConstructingObjectParser.optionalConstructorArg(), new ParseField(ALLOCATION));
5057
PARSER.declareObject(ConstructingObjectParser.optionalConstructorArg(), (p, c) -> BulkByScrollResponse.fromXContent(p),
5158
new ParseField(REINDEX_RESPONSE));
@@ -57,32 +64,35 @@ public class ReindexTaskStateDoc implements ToXContentObject {
5764
}
5865

5966
private final ReindexRequest reindexRequest;
67+
private final List<? extends Collection<String>> indexGroups;
6068
private final Long allocationId;
6169
private final BulkByScrollResponse reindexResponse;
6270
private final ElasticsearchException exception;
6371
private final RestStatus failureStatusCode;
6472
private final ScrollableHitSource.Checkpoint checkpoint;
6573

66-
public ReindexTaskStateDoc(ReindexRequest reindexRequest) {
67-
this(reindexRequest, null, null, null, (RestStatus) null, null);
74+
public ReindexTaskStateDoc(ReindexRequest reindexRequest, List<? extends Collection<String>> indexGroups) {
75+
this(reindexRequest, indexGroups, null, null, null, (RestStatus) null, null);
6876
}
6977

70-
public ReindexTaskStateDoc(ReindexRequest reindexRequest, @Nullable Long allocationId,
78+
public ReindexTaskStateDoc(ReindexRequest reindexRequest, List<? extends Collection<String>> indexGroups, @Nullable Long allocationId,
7179
@Nullable BulkByScrollResponse reindexResponse, @Nullable ElasticsearchException exception,
7280
@Nullable Integer failureStatusCode, ScrollableHitSource.Checkpoint checkpoint) {
73-
this(reindexRequest, allocationId, reindexResponse, exception,
81+
this(reindexRequest, indexGroups, allocationId, reindexResponse, exception,
7482
failureStatusCode == null ? null : RestStatus.fromCode(failureStatusCode), checkpoint);
7583
}
7684

77-
public ReindexTaskStateDoc(ReindexRequest reindexRequest, @Nullable Long allocationId,
85+
public ReindexTaskStateDoc(ReindexRequest reindexRequest, List<? extends Collection<String>> indexGroups, @Nullable Long allocationId,
7886
@Nullable BulkByScrollResponse reindexResponse, @Nullable ElasticsearchException exception,
7987
@Nullable ScrollableHitSource.Checkpoint checkpoint) {
80-
this(reindexRequest, allocationId, reindexResponse, exception, exception != null ? exception.status() : null, checkpoint);
88+
this(reindexRequest, indexGroups, allocationId, reindexResponse,
89+
exception, exception != null ? exception.status() : null, checkpoint);
8190
}
8291

83-
private ReindexTaskStateDoc(ReindexRequest reindexRequest, @Nullable Long allocationId,
92+
private ReindexTaskStateDoc(ReindexRequest reindexRequest, List<? extends Collection<String>> indexGroups, @Nullable Long allocationId,
8493
@Nullable BulkByScrollResponse reindexResponse, @Nullable ElasticsearchException exception,
8594
@Nullable RestStatus failureStatusCode, @Nullable ScrollableHitSource.Checkpoint checkpoint) {
95+
this.indexGroups = indexGroups;
8696
this.allocationId = allocationId;
8797
assert (reindexResponse == null) || (exception == null) : "Either response or exception must be null";
8898
this.reindexRequest = reindexRequest;
@@ -97,6 +107,7 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
97107
builder.startObject();
98108
builder.field(REINDEX_REQUEST);
99109
reindexRequest.toXContent(builder, params, true);
110+
builder.field(INDEX_GROUPS, indexGroups);
100111
if (allocationId != null) {
101112
builder.field(ALLOCATION, allocationId);
102113
}
@@ -128,6 +139,10 @@ public ReindexRequest getReindexRequest() {
128139
return reindexRequest;
129140
}
130141

142+
public List<? extends Collection<String>> getIndexGroups() {
143+
return indexGroups;
144+
}
145+
131146
public BulkByScrollResponse getReindexResponse() {
132147
return reindexResponse;
133148
}
@@ -150,15 +165,17 @@ public Long getAllocationId() {
150165

151166
public ReindexTaskStateDoc withCheckpoint(ScrollableHitSource.Checkpoint checkpoint, BulkByScrollTask.Status status) {
152167
// todo: also store and resume from status.
153-
return new ReindexTaskStateDoc(reindexRequest, allocationId, reindexResponse, exception, failureStatusCode, checkpoint);
168+
return new ReindexTaskStateDoc(reindexRequest, indexGroups, allocationId, reindexResponse,
169+
exception, failureStatusCode, checkpoint);
154170
}
155171

156172
public ReindexTaskStateDoc withNewAllocation(long newAllocationId) {
157-
return new ReindexTaskStateDoc(reindexRequest, newAllocationId, reindexResponse, exception, failureStatusCode, checkpoint);
173+
return new ReindexTaskStateDoc(reindexRequest, indexGroups, newAllocationId, reindexResponse,
174+
exception, failureStatusCode, checkpoint);
158175
}
159176

160177
public ReindexTaskStateDoc withFinishedState(@Nullable BulkByScrollResponse reindexResponse,
161178
@Nullable ElasticsearchException exception) {
162-
return new ReindexTaskStateDoc(reindexRequest, allocationId, reindexResponse, exception, checkpoint);
179+
return new ReindexTaskStateDoc(reindexRequest, indexGroups, allocationId, reindexResponse, exception, checkpoint);
163180
}
164181
}

modules/reindex/src/main/java/org/elasticsearch/index/reindex/TransportStartReindexTaskAction.java

+3-2
Original file line numberDiff line numberDiff line change
@@ -103,9 +103,10 @@ protected void doExecute(Task task, StartReindexTaskAction.Request request, Acti
103103

104104
// In the current implementation, we only need to store task results if we do not wait for completion
105105
boolean storeTaskResult = request.getWaitForCompletion() == false;
106-
ReindexTaskParams job = new ReindexTaskParams(storeTaskResult, resolveIndexPatterns(request.getReindexRequest()), included);
106+
ReindexTaskParams job = new ReindexTaskParams(storeTaskResult, included);
107107

108-
ReindexTaskStateDoc reindexState = new ReindexTaskStateDoc(request.getReindexRequest());
108+
ReindexTaskStateDoc reindexState =
109+
new ReindexTaskStateDoc(request.getReindexRequest(), resolveIndexPatterns(request.getReindexRequest()));
109110
reindexIndexClient.createReindexTaskDoc(generatedId, reindexState, new ActionListener<>() {
110111
@Override
111112
public void onResponse(ReindexTaskState taskState) {

modules/reindex/src/test/java/org/elasticsearch/index/reindex/ReindexIndexClientTests.java

+2-1
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import org.elasticsearch.cluster.service.ClusterService;
2525
import org.elasticsearch.test.ESSingleNodeTestCase;
2626

27+
import java.util.Collections;
2728
import java.util.concurrent.TimeUnit;
2829

2930
import static org.elasticsearch.index.reindex.ReindexIndexClient.REINDEX_ALIAS;
@@ -36,7 +37,7 @@ public void testAliasAndIndexCreated() {
3637
ReindexIndexClient client = new ReindexIndexClient(client(), getInstanceFromNode(ClusterService.class), null);
3738

3839
PlainActionFuture<ReindexTaskState> future = new PlainActionFuture<>();
39-
client.createReindexTaskDoc(randomAlphaOfLength(5), new ReindexTaskStateDoc(new ReindexRequest()), future);
40+
client.createReindexTaskDoc(randomAlphaOfLength(5), new ReindexTaskStateDoc(new ReindexRequest(), Collections.emptyList()), future);
4041
future.actionGet(10, TimeUnit.SECONDS);
4142

4243
GetAliasesResponse aliases = client().admin().indices().prepareGetAliases(REINDEX_ALIAS).get();

modules/reindex/src/test/java/org/elasticsearch/index/reindex/ReindexTaskParamsTests.java renamed to modules/reindex/src/test/java/org/elasticsearch/index/reindex/ReindexTaskStateDocTests.java

+7-24
Original file line numberDiff line numberDiff line change
@@ -18,15 +18,12 @@
1818
*/
1919
package org.elasticsearch.index.reindex;
2020

21-
import org.elasticsearch.common.io.stream.BytesStreamOutput;
22-
import org.elasticsearch.common.io.stream.StreamInput;
2321
import org.elasticsearch.common.xcontent.XContentParser;
2422
import org.elasticsearch.test.AbstractXContentTestCase;
2523

2624
import java.io.IOException;
2725
import java.util.ArrayList;
2826
import java.util.Collection;
29-
import java.util.HashMap;
3027
import java.util.HashSet;
3128
import java.util.List;
3229
import java.util.Set;
@@ -35,13 +32,12 @@
3532

3633

3734
// todo: test more then the index_groups field.
38-
public class ReindexTaskParamsTests extends AbstractXContentTestCase<ReindexTaskParams> {
35+
public class ReindexTaskStateDocTests extends AbstractXContentTestCase<ReindexTaskStateDoc> {
3936

4037
@Override
41-
protected ReindexTaskParams createTestInstance() {
42-
return new ReindexTaskParams(randomBoolean(),
43-
IntStream.range(0, randomInt(20)).mapToObj(i -> setOrListOfStrings()).collect(Collectors.toList()),
44-
new HashMap<>());
38+
protected ReindexTaskStateDoc createTestInstance() {
39+
return new ReindexTaskStateDoc(new ReindexRequest().setDestIndex("test"),
40+
IntStream.range(0, randomInt(20)).mapToObj(i -> setOrListOfStrings()).collect(Collectors.toList()));
4541
}
4642

4743
private Collection<String> setOrListOfStrings() {
@@ -54,8 +50,8 @@ private Collection<String> setOrListOfStrings() {
5450
}
5551

5652
@Override
57-
protected ReindexTaskParams doParseInstance(XContentParser parser) throws IOException {
58-
return ReindexTaskParams.fromXContent(parser);
53+
protected ReindexTaskStateDoc doParseInstance(XContentParser parser) throws IOException {
54+
return ReindexTaskStateDoc.fromXContent(parser);
5955
}
6056

6157
@Override
@@ -64,25 +60,12 @@ protected boolean supportsUnknownFields() {
6460
}
6561

6662
@Override
67-
protected void assertEqualInstances(ReindexTaskParams expectedInstance, ReindexTaskParams newInstance) {
63+
protected void assertEqualInstances(ReindexTaskStateDoc expectedInstance, ReindexTaskStateDoc newInstance) {
6864
assertNotSame(newInstance, expectedInstance);
6965

7066
List<Set<String>> expectedGroups = expectedInstance.getIndexGroups().stream().map(HashSet::new).collect(Collectors.toList());
7167
List<Set<String>> newGroups = expectedInstance.getIndexGroups().stream().map(HashSet::new).collect(Collectors.toList());
7268

7369
assertEquals(expectedGroups, newGroups);
7470
}
75-
76-
public void testSerialization() throws IOException {
77-
ReindexTaskParams before = createTestInstance();
78-
79-
final BytesStreamOutput out = new BytesStreamOutput();
80-
before.writeTo(out);
81-
82-
final StreamInput in = out.bytes().streamInput();
83-
84-
ReindexTaskParams after = new ReindexTaskParams(in);
85-
86-
assertEqualInstances(before, after);
87-
}
8871
}

modules/reindex/src/test/java/org/elasticsearch/index/reindex/ReindexTaskStateUpdaterTests.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -266,7 +266,7 @@ private void createDoc(ReindexIndexClient client, String taskId) {
266266
reindex().source("source").destination("dest").refresh(true).request().setCheckpointInterval(TimeValue.ZERO);
267267

268268
PlainActionFuture<ReindexTaskState> future = PlainActionFuture.newFuture();
269-
client.createReindexTaskDoc(taskId, new ReindexTaskStateDoc(request), future);
269+
client.createReindexTaskDoc(taskId, new ReindexTaskStateDoc(request, Collections.emptyList()), future);
270270
future.actionGet();
271271
}
272272

0 commit comments

Comments
 (0)