Skip to content

Commit 7120dfc

Browse files
Reindex rethrottle persistent task
This adds support for rethrottling resilient/persistent reindex through updating the .reindex index and notifying the task. This ensures that the new throttle value sticks on failovers while also ensuring that the task wakes up immediately if it had a very low throttle value. Related to elastic#42612
1 parent 552c417 commit 7120dfc

18 files changed

+423
-78
lines changed

modules/reindex/src/main/java/org/elasticsearch/index/reindex/BulkByScrollParallelizationHelper.java

+6-4
Original file line numberDiff line numberDiff line change
@@ -68,7 +68,7 @@ static <Request extends AbstractBulkByScrollRequest<Request>> void startSlicedAc
6868
Client client,
6969
DiscoveryNode node,
7070
Runnable workerAction) {
71-
initTaskState(task, request, client, new ActionListener<>() {
71+
initTaskState(task, request, request.getRequestsPerSecond(), client, new ActionListener<>() {
7272
@Override
7373
public void onResponse(Void aVoid) {
7474
executeSlicedAction(task, request, action, listener, client, node, workerAction);
@@ -119,6 +119,7 @@ static <Request extends AbstractBulkByScrollRequest<Request>> void executeSliced
119119
static <Request extends AbstractBulkByScrollRequest<Request>> void initTaskState(
120120
BulkByScrollTask task,
121121
Request request,
122+
float requestsPerSecond,
122123
Client client,
123124
ActionListener<Void> listener) {
124125
int configuredSlices = request.getSlices();
@@ -128,7 +129,7 @@ static <Request extends AbstractBulkByScrollRequest<Request>> void initTaskState
128129
client.admin().cluster().searchShards(shardsRequest, new ActionListener<>() {
129130
@Override
130131
public void onResponse(ClusterSearchShardsResponse response) {
131-
setWorkerCount(request, task, countSlicesBasedOnShards(response));
132+
setWorkerCount(request, requestsPerSecond, task, countSlicesBasedOnShards(response));
132133
listener.onResponse(null);
133134
}
134135

@@ -138,21 +139,22 @@ public void onFailure(Exception e) {
138139
}
139140
});
140141
} else {
141-
setWorkerCount(request, task, configuredSlices);
142+
setWorkerCount(request, requestsPerSecond, task, configuredSlices);
142143
listener.onResponse(null);
143144
}
144145
}
145146

146147
private static <Request extends AbstractBulkByScrollRequest<Request>> void setWorkerCount(
147148
Request request,
149+
float requestsPerSecond,
148150
BulkByScrollTask task,
149151
int slices) {
150152
if (slices > 1) {
151153
task.setWorkerCount(slices);
152154
} else {
153155
SliceBuilder sliceBuilder = request.getSearchRequest().source().slice();
154156
Integer sliceId = sliceBuilder == null ? null : sliceBuilder.getId();
155-
task.setWorker(request.getRequestsPerSecond(), sliceId);
157+
task.setWorker(requestsPerSecond, sliceId);
156158
}
157159
}
158160

modules/reindex/src/main/java/org/elasticsearch/index/reindex/ReindexPlugin.java

+2-1
Original file line numberDiff line numberDiff line change
@@ -72,7 +72,8 @@ public class ReindexPlugin extends Plugin implements ActionPlugin, PersistentTas
7272
new ActionHandler<>(UpdateByQueryAction.INSTANCE, TransportUpdateByQueryAction.class),
7373
new ActionHandler<>(DeleteByQueryAction.INSTANCE, TransportDeleteByQueryAction.class),
7474
new ActionHandler<>(RethrottleAction.INSTANCE, TransportRethrottleAction.class),
75-
new ActionHandler<>(StartReindexTaskAction.INSTANCE, TransportStartReindexTaskAction.class)
75+
new ActionHandler<>(StartReindexTaskAction.INSTANCE, TransportStartReindexTaskAction.class),
76+
new ActionHandler<>(RethrottlePersistentReindexAction.INSTANCE, TransportRethrottlePersistentReindexAction.class)
7677
);
7778
}
7879

modules/reindex/src/main/java/org/elasticsearch/index/reindex/ReindexTask.java

+19-4
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@
3636
import org.elasticsearch.script.ScriptService;
3737
import org.elasticsearch.tasks.Task;
3838
import org.elasticsearch.tasks.TaskId;
39+
import org.elasticsearch.tasks.TaskInfo;
3940
import org.elasticsearch.tasks.TaskManager;
4041
import org.elasticsearch.threadpool.ThreadPool;
4142

@@ -138,11 +139,25 @@ BulkByScrollTask getChildTask() {
138139
return childTask;
139140
}
140141

142+
private void rethrottle(float requestsPerSecond) {
143+
TransportRethrottleAction.rethrottle(logger, client.getLocalNodeId(), client, childTask, requestsPerSecond,
144+
new ActionListener<>() {
145+
@Override
146+
public void onResponse(TaskInfo taskInfo) {
147+
}
148+
149+
@Override
150+
public void onFailure(Exception e) {
151+
assert false : e;
152+
logger.error("Unable to rethrottle [{}]", getPersistentTaskId());
153+
}
154+
});
155+
}
156+
141157
private void execute(ReindexTaskParams reindexTaskParams) {
142158
long allocationId = getAllocationId();
143-
144159
ReindexTaskStateUpdater taskUpdater = new ReindexTaskStateUpdater(reindexIndexClient, client.threadPool(), getPersistentTaskId(),
145-
allocationId, new ActionListener<>() {
160+
allocationId, taskId, new ActionListener<>() {
146161
@Override
147162
public void onResponse(ReindexTaskStateDoc stateDoc) {
148163
reindexDone(stateDoc, reindexTaskParams.shouldStoreResult());
@@ -153,14 +168,14 @@ public void onFailure(Exception e) {
153168
logger.info("Reindex task failed", e);
154169
updateClusterStateToFailed(reindexTaskParams.shouldStoreResult(), ReindexPersistentTaskState.Status.DONE, e);
155170
}
156-
}, this::handleCheckpointAssignmentConflict);
171+
}, this::handleCheckpointAssignmentConflict, this::rethrottle);
157172

158173
taskUpdater.assign(new ActionListener<>() {
159174
@Override
160175
public void onResponse(ReindexTaskStateDoc stateDoc) {
161176
ReindexRequest reindexRequest = stateDoc.getReindexRequest();
162177
description = reindexRequest.getDescription();
163-
reindexer.initTask(childTask, reindexRequest, new ActionListener<>() {
178+
reindexer.initTask(childTask, reindexRequest, stateDoc.getRequestsPerSecond(), new ActionListener<>() {
164179
@Override
165180
public void onResponse(Void aVoid) {
166181
// TODO: need to store status in state so we can continue from it.

modules/reindex/src/main/java/org/elasticsearch/index/reindex/ReindexTaskStateDoc.java

+57-15
Original file line numberDiff line numberDiff line change
@@ -27,63 +27,81 @@
2727
import org.elasticsearch.common.xcontent.XContentBuilder;
2828
import org.elasticsearch.common.xcontent.XContentParser;
2929
import org.elasticsearch.rest.RestStatus;
30+
import org.elasticsearch.tasks.TaskId;
3031

3132
import java.io.IOException;
3233

3334
public class ReindexTaskStateDoc implements ToXContentObject {
3435

3536
public static final ConstructingObjectParser<ReindexTaskStateDoc, Void> PARSER =
3637
new ConstructingObjectParser<>("reindex/index_state", a -> new ReindexTaskStateDoc((ReindexRequest) a[0], (Long) a[1],
37-
(BulkByScrollResponse) a[2], (ElasticsearchException) a[3], (Integer) a[4], (ScrollableHitSource.Checkpoint) a[5]));
38+
toTaskId((String) a[2]), (BulkByScrollResponse) a[3], (ElasticsearchException) a[4], (Integer) a[5],
39+
(ScrollableHitSource.Checkpoint) a[6],
40+
(float) a[7]));
3841

3942
private static final String REINDEX_REQUEST = "request";
4043
private static final String ALLOCATION = "allocation";
44+
private static final String EPHEMERAL_TASK_ID = "ephemeral_task_id";
4145
private static final String REINDEX_RESPONSE = "response";
4246
private static final String REINDEX_EXCEPTION = "exception";
4347
private static final String FAILURE_REST_STATUS = "failure_rest_status";
4448
private static final String REINDEX_CHECKPOINT = "checkpoint";
49+
private static final String REQUESTS_PER_SECOND = "requests_per_second";
4550

4651
static {
4752
PARSER.declareObject(ConstructingObjectParser.constructorArg(), (p, c) -> ReindexRequest.fromXContentWithParams(p),
4853
new ParseField(REINDEX_REQUEST));
4954
PARSER.declareLong(ConstructingObjectParser.optionalConstructorArg(), new ParseField(ALLOCATION));
55+
PARSER.declareString(ConstructingObjectParser.optionalConstructorArg(), new ParseField(EPHEMERAL_TASK_ID));
5056
PARSER.declareObject(ConstructingObjectParser.optionalConstructorArg(), (p, c) -> BulkByScrollResponse.fromXContent(p),
5157
new ParseField(REINDEX_RESPONSE));
5258
PARSER.declareObject(ConstructingObjectParser.optionalConstructorArg(), (p, c) -> ElasticsearchException.fromXContent(p),
5359
new ParseField(REINDEX_EXCEPTION));
5460
PARSER.declareInt(ConstructingObjectParser.optionalConstructorArg(), new ParseField(FAILURE_REST_STATUS));
5561
PARSER.declareObject(ConstructingObjectParser.optionalConstructorArg(), (p, c) -> ScrollableHitSource.Checkpoint.fromXContent(p),
5662
new ParseField(REINDEX_CHECKPOINT));
63+
PARSER.declareFloat(ConstructingObjectParser.constructorArg(), new ParseField(REQUESTS_PER_SECOND));
5764
}
5865

5966
private final ReindexRequest reindexRequest;
6067
private final Long allocationId;
68+
private final TaskId ephemeralTaskId;
6169
private final BulkByScrollResponse reindexResponse;
6270
private final ElasticsearchException exception;
6371
private final RestStatus failureStatusCode;
6472
private final ScrollableHitSource.Checkpoint checkpoint;
73+
private final float requestsPerSecond;
6574

6675
public ReindexTaskStateDoc(ReindexRequest reindexRequest) {
67-
this(reindexRequest, null, null, null, (RestStatus) null, null);
76+
this(reindexRequest, null, null, null, null, (RestStatus) null, null, reindexRequest.getRequestsPerSecond());
6877
}
6978

7079
public ReindexTaskStateDoc(ReindexRequest reindexRequest, @Nullable Long allocationId,
71-
@Nullable BulkByScrollResponse reindexResponse, @Nullable ElasticsearchException exception,
72-
@Nullable Integer failureStatusCode, ScrollableHitSource.Checkpoint checkpoint) {
73-
this(reindexRequest, allocationId, reindexResponse, exception,
74-
failureStatusCode == null ? null : RestStatus.fromCode(failureStatusCode), checkpoint);
80+
@Nullable TaskId ephemeralTaskId, @Nullable BulkByScrollResponse reindexResponse,
81+
@Nullable ElasticsearchException exception,
82+
@Nullable Integer failureStatusCode, ScrollableHitSource.Checkpoint checkpoint, float requestsPerSecond) {
83+
this(reindexRequest, allocationId, ephemeralTaskId, reindexResponse, exception,
84+
failureStatusCode == null ? null : RestStatus.fromCode(failureStatusCode), checkpoint, requestsPerSecond);
7585
}
7686

7787
public ReindexTaskStateDoc(ReindexRequest reindexRequest, @Nullable Long allocationId,
78-
@Nullable BulkByScrollResponse reindexResponse, @Nullable ElasticsearchException exception,
79-
@Nullable ScrollableHitSource.Checkpoint checkpoint) {
80-
this(reindexRequest, allocationId, reindexResponse, exception, exception != null ? exception.status() : null, checkpoint);
88+
@Nullable TaskId ephemeralTaskId, @Nullable BulkByScrollResponse reindexResponse,
89+
@Nullable ElasticsearchException exception,
90+
@Nullable ScrollableHitSource.Checkpoint checkpoint, float requestsPerSecond) {
91+
this(reindexRequest, allocationId, ephemeralTaskId, reindexResponse, exception, exception != null ? exception.status() : null,
92+
checkpoint, requestsPerSecond);
8193
}
8294

8395
private ReindexTaskStateDoc(ReindexRequest reindexRequest, @Nullable Long allocationId,
84-
@Nullable BulkByScrollResponse reindexResponse, @Nullable ElasticsearchException exception,
85-
@Nullable RestStatus failureStatusCode, @Nullable ScrollableHitSource.Checkpoint checkpoint) {
96+
@Nullable TaskId ephemeralTaskId, @Nullable BulkByScrollResponse reindexResponse,
97+
@Nullable ElasticsearchException exception,
98+
@Nullable RestStatus failureStatusCode, @Nullable ScrollableHitSource.Checkpoint checkpoint,
99+
float requestsPerSecond) {
100+
assert (allocationId == null) == (ephemeralTaskId == null);
86101
this.allocationId = allocationId;
102+
this.ephemeralTaskId = ephemeralTaskId;
103+
assert Float.isNaN(requestsPerSecond) == false && requestsPerSecond >= 0;
104+
this.requestsPerSecond = requestsPerSecond;
87105
assert (reindexResponse == null) || (exception == null) : "Either response or exception must be null";
88106
this.reindexRequest = reindexRequest;
89107
this.reindexResponse = reindexResponse;
@@ -100,6 +118,9 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
100118
if (allocationId != null) {
101119
builder.field(ALLOCATION, allocationId);
102120
}
121+
if (ephemeralTaskId != null) {
122+
builder.field(EPHEMERAL_TASK_ID, ephemeralTaskId.toString());
123+
}
103124
if (reindexResponse != null) {
104125
builder.field(REINDEX_RESPONSE);
105126
builder.startObject();
@@ -117,13 +138,18 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
117138
builder.field(REINDEX_CHECKPOINT);
118139
checkpoint.toXContent(builder, params);
119140
}
141+
builder.field(REQUESTS_PER_SECOND, requestsPerSecond);
120142
return builder.endObject();
121143
}
122144

123145
public static ReindexTaskStateDoc fromXContent(XContentParser parser) {
124146
return PARSER.apply(parser, null);
125147
}
126148

149+
private static TaskId toTaskId(String s) {
150+
return s != null ? new TaskId(s) : null;
151+
}
152+
127153
public ReindexRequest getReindexRequest() {
128154
return reindexRequest;
129155
}
@@ -148,17 +174,33 @@ public Long getAllocationId() {
148174
return allocationId;
149175
}
150176

177+
public TaskId getEphemeralTaskId() {
178+
return ephemeralTaskId;
179+
}
180+
181+
public float getRequestsPerSecond() {
182+
return requestsPerSecond;
183+
}
184+
151185
public ReindexTaskStateDoc withCheckpoint(ScrollableHitSource.Checkpoint checkpoint, BulkByScrollTask.Status status) {
152186
// todo: also store and resume from status.
153-
return new ReindexTaskStateDoc(reindexRequest, allocationId, reindexResponse, exception, failureStatusCode, checkpoint);
187+
return new ReindexTaskStateDoc(reindexRequest, allocationId, ephemeralTaskId, reindexResponse, exception, failureStatusCode,
188+
checkpoint, requestsPerSecond);
154189
}
155190

156-
public ReindexTaskStateDoc withNewAllocation(long newAllocationId) {
157-
return new ReindexTaskStateDoc(reindexRequest, newAllocationId, reindexResponse, exception, failureStatusCode, checkpoint);
191+
public ReindexTaskStateDoc withNewAllocation(long newAllocationId, TaskId ephemeralTaskId) {
192+
return new ReindexTaskStateDoc(reindexRequest, newAllocationId, ephemeralTaskId, reindexResponse, exception, failureStatusCode,
193+
checkpoint, requestsPerSecond);
158194
}
159195

160196
public ReindexTaskStateDoc withFinishedState(@Nullable BulkByScrollResponse reindexResponse,
161197
@Nullable ElasticsearchException exception) {
162-
return new ReindexTaskStateDoc(reindexRequest, allocationId, reindexResponse, exception, checkpoint);
198+
return new ReindexTaskStateDoc(reindexRequest, allocationId, ephemeralTaskId, reindexResponse, exception, checkpoint,
199+
requestsPerSecond);
200+
}
201+
202+
public ReindexTaskStateDoc withRequestsPerSecond(float requestsPerSecond) {
203+
return new ReindexTaskStateDoc(reindexRequest, allocationId, ephemeralTaskId, reindexResponse, exception, checkpoint,
204+
requestsPerSecond);
163205
}
164206
}

0 commit comments

Comments
 (0)