Skip to content

Commit 0ba9ad9

Browse files
Reindex rethrottle through persistent task
This adds support for rethrottling resilient reindex through updating the persistent task, ensuring that rethrottle sticks on failovers. Related to elastic#42612
1 parent e12b58f commit 0ba9ad9

21 files changed

+627
-24
lines changed

modules/reindex/src/main/java/org/elasticsearch/index/reindex/BulkByScrollParallelizationHelper.java

+6-4
Original file line numberDiff line numberDiff line change
@@ -68,7 +68,7 @@ static <Request extends AbstractBulkByScrollRequest<Request>> void startSlicedAc
6868
Client client,
6969
DiscoveryNode node,
7070
Runnable workerAction) {
71-
initTaskState(task, request, client, new ActionListener<>() {
71+
initTaskState(task, request, request.getRequestsPerSecond(), client, new ActionListener<>() {
7272
@Override
7373
public void onResponse(Void aVoid) {
7474
executeSlicedAction(task, request, action, listener, client, node, workerAction);
@@ -119,6 +119,7 @@ static <Request extends AbstractBulkByScrollRequest<Request>> void executeSliced
119119
static <Request extends AbstractBulkByScrollRequest<Request>> void initTaskState(
120120
BulkByScrollTask task,
121121
Request request,
122+
float requestsPerSecond,
122123
Client client,
123124
ActionListener<Void> listener) {
124125
int configuredSlices = request.getSlices();
@@ -128,7 +129,7 @@ static <Request extends AbstractBulkByScrollRequest<Request>> void initTaskState
128129
client.admin().cluster().searchShards(shardsRequest, new ActionListener<>() {
129130
@Override
130131
public void onResponse(ClusterSearchShardsResponse response) {
131-
setWorkerCount(request, task, countSlicesBasedOnShards(response));
132+
setWorkerCount(request, requestsPerSecond, task, countSlicesBasedOnShards(response));
132133
listener.onResponse(null);
133134
}
134135

@@ -138,21 +139,22 @@ public void onFailure(Exception e) {
138139
}
139140
});
140141
} else {
141-
setWorkerCount(request, task, configuredSlices);
142+
setWorkerCount(request, requestsPerSecond, task, configuredSlices);
142143
listener.onResponse(null);
143144
}
144145
}
145146

146147
private static <Request extends AbstractBulkByScrollRequest<Request>> void setWorkerCount(
147148
Request request,
149+
float requestsPerSecond,
148150
BulkByScrollTask task,
149151
int slices) {
150152
if (slices > 1) {
151153
task.setWorkerCount(slices);
152154
} else {
153155
SliceBuilder sliceBuilder = request.getSearchRequest().source().slice();
154156
Integer sliceId = sliceBuilder == null ? null : sliceBuilder.getId();
155-
task.setWorker(request.getRequestsPerSecond(), sliceId);
157+
task.setWorker(requestsPerSecond, sliceId);
156158
}
157159
}
158160

modules/reindex/src/main/java/org/elasticsearch/index/reindex/ReindexPlugin.java

+7-2
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@
3838
import org.elasticsearch.env.Environment;
3939
import org.elasticsearch.env.NodeEnvironment;
4040
import org.elasticsearch.persistent.PersistentTaskParams;
41+
import org.elasticsearch.persistent.PersistentTaskParamsUpdateFunction;
4142
import org.elasticsearch.persistent.PersistentTaskState;
4243
import org.elasticsearch.persistent.PersistentTasksExecutor;
4344
import org.elasticsearch.plugins.ActionPlugin;
@@ -72,7 +73,8 @@ public class ReindexPlugin extends Plugin implements ActionPlugin, PersistentTas
7273
new ActionHandler<>(UpdateByQueryAction.INSTANCE, TransportUpdateByQueryAction.class),
7374
new ActionHandler<>(DeleteByQueryAction.INSTANCE, TransportDeleteByQueryAction.class),
7475
new ActionHandler<>(RethrottleAction.INSTANCE, TransportRethrottleAction.class),
75-
new ActionHandler<>(StartReindexTaskAction.INSTANCE, TransportStartReindexTaskAction.class)
76+
new ActionHandler<>(StartReindexTaskAction.INSTANCE, TransportStartReindexTaskAction.class),
77+
new ActionHandler<>(RethrottlePersistentReindexAction.INSTANCE, TransportRethrottlePersistentReindexAction.class)
7678
);
7779
}
7880

@@ -82,7 +84,10 @@ public List<NamedWriteableRegistry.Entry> getNamedWriteables() {
8284
new NamedWriteableRegistry.Entry(Task.Status.class, BulkByScrollTask.Status.NAME, BulkByScrollTask.Status::new),
8385
new NamedWriteableRegistry.Entry(PersistentTaskParams.class, ReindexTaskParams.NAME, ReindexTaskParams::new),
8486
new NamedWriteableRegistry.Entry(Task.Status.class, ReindexPersistentTaskState.NAME, ReindexPersistentTaskState::new),
85-
new NamedWriteableRegistry.Entry(PersistentTaskState.class, ReindexPersistentTaskState.NAME, ReindexPersistentTaskState::new));
87+
new NamedWriteableRegistry.Entry(PersistentTaskState.class, ReindexPersistentTaskState.NAME, ReindexPersistentTaskState::new),
88+
new NamedWriteableRegistry.Entry(PersistentTaskParamsUpdateFunction.class,
89+
TransportRethrottlePersistentReindexAction.RethrottlePersistentTaskFunction.NAME,
90+
TransportRethrottlePersistentReindexAction.RethrottlePersistentTaskFunction::new));
8691
}
8792

8893
@Override

modules/reindex/src/main/java/org/elasticsearch/index/reindex/ReindexTask.java

+37-3
Original file line numberDiff line numberDiff line change
@@ -30,12 +30,13 @@
3030
import org.elasticsearch.common.util.concurrent.ThreadContext;
3131
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
3232
import org.elasticsearch.persistent.AllocatedPersistentTask;
33+
import org.elasticsearch.persistent.DynamicPersistentTasksExecutor;
3334
import org.elasticsearch.persistent.PersistentTaskState;
3435
import org.elasticsearch.persistent.PersistentTasksCustomMetaData;
35-
import org.elasticsearch.persistent.PersistentTasksExecutor;
3636
import org.elasticsearch.script.ScriptService;
3737
import org.elasticsearch.tasks.Task;
3838
import org.elasticsearch.tasks.TaskId;
39+
import org.elasticsearch.tasks.TaskInfo;
3940
import org.elasticsearch.tasks.TaskManager;
4041
import org.elasticsearch.threadpool.ThreadPool;
4142

@@ -57,8 +58,9 @@ public class ReindexTask extends AllocatedPersistentTask {
5758
private volatile BulkByScrollTask.Status transientStatus;
5859
private volatile String description;
5960
private volatile boolean assignmentConflictDetected;
61+
private volatile Float requestsPerSecond;
6062

61-
public static class ReindexPersistentTasksExecutor extends PersistentTasksExecutor<ReindexTaskParams> {
63+
public static class ReindexPersistentTasksExecutor extends DynamicPersistentTasksExecutor<ReindexTask, ReindexTaskParams> {
6264

6365
private final ClusterService clusterService;
6466
private final Client client;
@@ -92,6 +94,11 @@ protected AllocatedPersistentTask createTask(long id, String type, String action
9294
Reindexer reindexer = new Reindexer(clusterService, client, threadPool, scriptService, reindexSslConfig);
9395
return new ReindexTask(id, type, action, parentTaskId, headers, clusterService, xContentRegistry, client, reindexer);
9496
}
97+
98+
@Override
99+
protected void paramsUpdated(ReindexTask task, ReindexTaskParams newParams) {
100+
task.requestsPerSecondUpdated(newParams.getRequestsPerSecond());
101+
}
95102
}
96103

97104
private ReindexTask(long id, String type, String action, TaskId parentTask, Map<String, String> headers,
@@ -138,6 +145,29 @@ BulkByScrollTask getChildTask() {
138145
return childTask;
139146
}
140147

148+
private void requestsPerSecondUpdated(float requestsPerSecond) {
149+
this.requestsPerSecond = requestsPerSecond;
150+
if (childTask.isLeader() || childTask.isWorker()) {
151+
// reindex is running
152+
rethrottle(requestsPerSecond);
153+
}
154+
}
155+
156+
private void rethrottle(float requestsPerSecond) {
157+
TransportRethrottleAction.rethrottle(logger, client.getLocalNodeId(), client, childTask, requestsPerSecond,
158+
new ActionListener<>() {
159+
@Override
160+
public void onResponse(TaskInfo taskInfo) {
161+
}
162+
163+
@Override
164+
public void onFailure(Exception e) {
165+
assert false;
166+
logger.error("Unable to rethrottle [{}]", getPersistentTaskId());
167+
}
168+
});
169+
}
170+
141171
private void execute(ReindexTaskParams reindexTaskParams) {
142172
long allocationId = getAllocationId();
143173

@@ -160,9 +190,13 @@ public void onFailure(Exception e) {
160190
public void onResponse(ReindexTaskStateDoc stateDoc) {
161191
ReindexRequest reindexRequest = stateDoc.getReindexRequest();
162192
description = reindexRequest.getDescription();
163-
reindexer.initTask(childTask, reindexRequest, new ActionListener<>() {
193+
reindexer.initTask(childTask, reindexRequest, reindexTaskParams.getRequestsPerSecond(), new ActionListener<>() {
164194
@Override
165195
public void onResponse(Void aVoid) {
196+
if (requestsPerSecond != null) {
197+
// updated while starting
198+
rethrottle(requestsPerSecond);
199+
}
166200
// TODO: need to store status in state so we can continue from it.
167201
transientStatus = childTask.getStatus();
168202
performReindex(reindexTaskParams, stateDoc, taskUpdater);

modules/reindex/src/main/java/org/elasticsearch/index/reindex/ReindexTaskParams.java

+29-2
Original file line numberDiff line numberDiff line change
@@ -30,34 +30,40 @@
3030

3131
import java.io.IOException;
3232
import java.util.Map;
33+
import java.util.Objects;
3334

3435
public class ReindexTaskParams implements PersistentTaskParams {
3536

3637
public static final String NAME = ReindexTask.NAME;
3738

3839
@SuppressWarnings("unchecked")
3940
public static final ConstructingObjectParser<ReindexTaskParams, Void> PARSER
40-
= new ConstructingObjectParser<>(NAME, a -> new ReindexTaskParams((Boolean) a[0], (Map<String, String>) a[1]));
41+
= new ConstructingObjectParser<>(NAME, a -> new ReindexTaskParams((Boolean) a[0], (Map<String, String>) a[1], (float) a[2]));
4142

4243
private static String STORE_RESULT = "store_result";
4344
private static String HEADERS = "headers";
45+
private static String REQUESTS_PER_SECOND = "requests_per_second";
4446

4547
static {
4648
PARSER.declareBoolean(ConstructingObjectParser.constructorArg(), new ParseField(STORE_RESULT));
4749
PARSER.declareObject(ConstructingObjectParser.constructorArg(), (p, c) -> p.mapStrings(), new ParseField(HEADERS));
50+
PARSER.declareFloat(ConstructingObjectParser.constructorArg(), new ParseField(REQUESTS_PER_SECOND));
4851
}
4952

5053
private final boolean storeResult;
5154
private final Map<String, String> headers;
55+
private final float requestsPerSecond;
5256

53-
public ReindexTaskParams(boolean storeResult, Map<String, String> headers) {
57+
public ReindexTaskParams(boolean storeResult, Map<String, String> headers, float requestsPerSecond) {
5458
this.storeResult = storeResult;
5559
this.headers = headers;
60+
this.requestsPerSecond = requestsPerSecond;
5661
}
5762

5863
public ReindexTaskParams(StreamInput in) throws IOException {
5964
storeResult = in.readBoolean();
6065
headers = in.readMap(StreamInput::readString, StreamInput::readString);
66+
requestsPerSecond = in.readFloat();
6167
}
6268

6369
@Override
@@ -75,13 +81,15 @@ public Version getMinimalSupportedVersion() {
7581
public void writeTo(StreamOutput out) throws IOException {
7682
out.writeBoolean(storeResult);
7783
out.writeMap(headers, StreamOutput::writeString, StreamOutput::writeString);
84+
out.writeFloat(requestsPerSecond);
7885
}
7986

8087
@Override
8188
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
8289
builder.startObject();
8390
builder.field(STORE_RESULT, storeResult);
8491
builder.field(HEADERS, headers);
92+
builder.field(REQUESTS_PER_SECOND, requestsPerSecond);
8593
return builder.endObject();
8694
}
8795

@@ -93,7 +101,26 @@ public Map<String, String> getHeaders() {
93101
return headers;
94102
}
95103

104+
public float getRequestsPerSecond() {
105+
return requestsPerSecond;
106+
}
107+
96108
public static ReindexTaskParams fromXContent(XContentParser parser) {
97109
return PARSER.apply(parser, null);
98110
}
111+
112+
@Override
113+
public boolean equals(Object o) {
114+
if (this == o) return true;
115+
if (o == null || getClass() != o.getClass()) return false;
116+
ReindexTaskParams that = (ReindexTaskParams) o;
117+
return storeResult == that.storeResult &&
118+
Float.compare(that.requestsPerSecond, requestsPerSecond) == 0 &&
119+
Objects.equals(headers, that.headers);
120+
}
121+
122+
@Override
123+
public int hashCode() {
124+
return Objects.hash(storeResult, headers, requestsPerSecond);
125+
}
99126
}

modules/reindex/src/main/java/org/elasticsearch/index/reindex/Reindexer.java

+3-2
Original file line numberDiff line numberDiff line change
@@ -92,8 +92,9 @@ public class Reindexer {
9292
this.reindexSslConfig = reindexSslConfig;
9393
}
9494

95-
public void initTask(BulkByScrollTask task, ReindexRequest request, ActionListener<Void> listener) {
96-
BulkByScrollParallelizationHelper.initTaskState(task, request, client, listener);
95+
public void initTask(BulkByScrollTask task, ReindexRequest request,
96+
float requestsPerSecond, ActionListener<Void> listener) {
97+
BulkByScrollParallelizationHelper.initTaskState(task, request, requestsPerSecond, client, listener);
9798
}
9899

99100
// todo: this may need a way to relay back that it failed and this reindex instance should stand down?

modules/reindex/src/main/java/org/elasticsearch/index/reindex/RestReindexAction.java

+2-1
Original file line numberDiff line numberDiff line change
@@ -109,7 +109,8 @@ public RestResponse buildResponse(StartReindexTaskAction.Response response, XCon
109109
builder.startObject();
110110
// This is the ephemeral task-id from the first node that is assigned the task (for BWC).
111111
builder.field("task", response.getTaskId());
112-
112+
// this is the new persistent task id
113+
builder.field("id", response.getPersistentTaskId());
113114
// TODO: Are there error conditions for the non-wait case?
114115
return new BytesRestResponse(RestStatus.OK, builder.endObject());
115116
}

modules/reindex/src/main/java/org/elasticsearch/index/reindex/RestRethrottleAction.java

+15-2
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import org.elasticsearch.rest.BaseRestHandler;
2525
import org.elasticsearch.rest.RestController;
2626
import org.elasticsearch.rest.RestRequest;
27+
import org.elasticsearch.rest.action.RestToXContentListener;
2728
import org.elasticsearch.tasks.TaskId;
2829

2930
import java.util.function.Supplier;
@@ -48,15 +49,27 @@ public String getName() {
4849

4950
@Override
5051
public RestChannelConsumer prepareRequest(final RestRequest request, final NodeClient client) {
51-
RethrottleRequest internalRequest = new RethrottleRequest();
52-
internalRequest.setTaskId(new TaskId(request.param("taskId")));
52+
String taskId = request.param("taskId");
5353
Float requestsPerSecond = AbstractBaseReindexRestHandler.parseRequestsPerSecond(request);
5454
if (requestsPerSecond == null) {
5555
throw new IllegalArgumentException("requests_per_second is a required parameter");
5656
}
57+
if (taskId.contains(":") == false) {
58+
return preparePersistentReindexRequest(taskId, client, requestsPerSecond);
59+
}
60+
RethrottleRequest internalRequest = new RethrottleRequest();
61+
internalRequest.setTaskId(new TaskId(taskId));
5762
internalRequest.setRequestsPerSecond(requestsPerSecond);
5863
final String groupBy = request.param("group_by", "nodes");
5964
return channel ->
6065
client.execute(RethrottleAction.INSTANCE, internalRequest, listTasksResponseListener(nodesInCluster, groupBy, channel));
6166
}
67+
68+
private RestChannelConsumer preparePersistentReindexRequest(String taskId, NodeClient client, float requestsPerSecond) {
69+
RethrottlePersistentReindexAction.Request internalRequest = new RethrottlePersistentReindexAction.Request(taskId,
70+
requestsPerSecond);
71+
72+
return channel ->
73+
client.execute(RethrottlePersistentReindexAction.INSTANCE, internalRequest, new RestToXContentListener<>(channel));
74+
}
6275
}

0 commit comments

Comments
 (0)