Skip to content

Reindex resume from status #49255

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ static <Request extends AbstractBulkByScrollRequest<Request>> void startSlicedAc
Client client,
DiscoveryNode node,
Runnable workerAction) {
initTaskState(task, request, client, new ActionListener<>() {
initTaskState(task, request, null, client, new ActionListener<>() {
@Override
public void onResponse(Void aVoid) {
executeSlicedAction(task, request, action, listener, client, node, workerAction);
Expand Down Expand Up @@ -119,6 +119,7 @@ static <Request extends AbstractBulkByScrollRequest<Request>> void executeSliced
static <Request extends AbstractBulkByScrollRequest<Request>> void initTaskState(
BulkByScrollTask task,
Request request,
BulkByScrollTask.Status checkpointStatus,
Client client,
ActionListener<Void> listener) {
int configuredSlices = request.getSlices();
Expand All @@ -128,7 +129,7 @@ static <Request extends AbstractBulkByScrollRequest<Request>> void initTaskState
client.admin().cluster().searchShards(shardsRequest, new ActionListener<>() {
@Override
public void onResponse(ClusterSearchShardsResponse response) {
setWorkerCount(request, task, countSlicesBasedOnShards(response));
setWorkerCount(request, task, countSlicesBasedOnShards(response), checkpointStatus);
listener.onResponse(null);
}

Expand All @@ -138,21 +139,23 @@ public void onFailure(Exception e) {
}
});
} else {
setWorkerCount(request, task, configuredSlices);
setWorkerCount(request, task, configuredSlices, checkpointStatus);
listener.onResponse(null);
}
}

private static <Request extends AbstractBulkByScrollRequest<Request>> void setWorkerCount(
Request request,
BulkByScrollTask task,
int slices) {
int slices,
BulkByScrollTask.Status checkpointStatus) {
if (slices > 1) {
assert checkpointStatus == null : "slices are not resilient";
task.setWorkerCount(slices);
} else {
SliceBuilder sliceBuilder = request.getSearchRequest().source().slice();
Integer sliceId = sliceBuilder == null ? null : sliceBuilder.getId();
task.setWorker(request.getRequestsPerSecond(), sliceId);
task.setWorker(request.getRequestsPerSecond(), sliceId, checkpointStatus);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -159,10 +159,9 @@ public void onFailure(Exception e) {
public void onResponse(ReindexTaskStateDoc stateDoc) {
ReindexRequest reindexRequest = stateDoc.getRethrottledReindexRequest();
description = reindexRequest.getDescription();
reindexer.initTask(childTask, reindexRequest, new ActionListener<>() {
reindexer.initTask(childTask, reindexRequest, stateDoc.getCheckpointStatus(), new ActionListener<>() {
@Override
public void onResponse(Void aVoid) {
// TODO: need to store status in state so we can continue from it.
transientStatus = childTask.getStatus();
performReindex(reindexRequest, reindexTaskParams, stateDoc, taskUpdater);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,9 @@ public class ReindexTaskStateDoc implements ToXContentObject {

public static final ConstructingObjectParser<ReindexTaskStateDoc, Void> PARSER =
new ConstructingObjectParser<>("reindex/index_state", a -> new ReindexTaskStateDoc((ReindexRequest) a[0], (Boolean) a[1],
(Long) a[2], toTaskId((String) a[3]), (BulkByScrollResponse) a[4], (ElasticsearchException) a[5], (Integer) a[6],
(ScrollableHitSource.Checkpoint) a[7], (float) a[8]));
(Long) a[2], toTaskId((String) a[3]), (BulkByScrollResponse) a[4],
(ElasticsearchException) a[5], (Integer) a[6],
(ScrollableHitSource.Checkpoint) a[7], (BulkByScrollTask.Status) a[8], (float) a[9]));

private static final String REINDEX_REQUEST = "request";
private static final String RESILIENT = "resilient";
Expand All @@ -47,6 +48,7 @@ public class ReindexTaskStateDoc implements ToXContentObject {
private static final String REINDEX_EXCEPTION = "exception";
private static final String FAILURE_REST_STATUS = "failure_rest_status";
private static final String REINDEX_CHECKPOINT = "checkpoint";
private static final String REINDEX_CHECKPOINT_STATUS = "checkpoint_status";
private static final String REQUESTS_PER_SECOND = "requests_per_second";

static {
Expand All @@ -62,6 +64,8 @@ public class ReindexTaskStateDoc implements ToXContentObject {
PARSER.declareInt(ConstructingObjectParser.optionalConstructorArg(), new ParseField(FAILURE_REST_STATUS));
PARSER.declareObject(ConstructingObjectParser.optionalConstructorArg(), (p, c) -> ScrollableHitSource.Checkpoint.fromXContent(p),
new ParseField(REINDEX_CHECKPOINT));
PARSER.declareObject(ConstructingObjectParser.optionalConstructorArg(), (p, c) -> BulkByScrollTask.Status.fromXContent(p),
new ParseField(REINDEX_CHECKPOINT_STATUS));
PARSER.declareFloat(ConstructingObjectParser.constructorArg(), new ParseField(REQUESTS_PER_SECOND));
}

Expand All @@ -73,33 +77,37 @@ public class ReindexTaskStateDoc implements ToXContentObject {
private final ElasticsearchException exception;
private final RestStatus failureStatusCode;
private final ScrollableHitSource.Checkpoint checkpoint;
private final BulkByScrollTask.Status checkpointStatus;
private final float requestsPerSecond;

public ReindexTaskStateDoc(ReindexRequest reindexRequest, boolean resilient) {
this(reindexRequest, resilient, null, null, null, null, (RestStatus) null, null, reindexRequest.getRequestsPerSecond());
this(reindexRequest, resilient, null, null, null, null, (RestStatus) null, null, null, reindexRequest.getRequestsPerSecond());
}

private ReindexTaskStateDoc(ReindexRequest reindexRequest, boolean resilient, @Nullable Long allocationId,
@Nullable TaskId ephemeralTaskId, @Nullable BulkByScrollResponse reindexResponse,
@Nullable ElasticsearchException exception,
@Nullable Integer failureStatusCode, ScrollableHitSource.Checkpoint checkpoint, float requestsPerSecond) {
@Nullable Integer failureStatusCode,
ScrollableHitSource.Checkpoint checkpoint, @Nullable BulkByScrollTask.Status checkpointStatus,
float requestsPerSecond) {
this(reindexRequest, resilient, allocationId, ephemeralTaskId, reindexResponse, exception,
failureStatusCode == null ? null : RestStatus.fromCode(failureStatusCode), checkpoint, requestsPerSecond);
failureStatusCode == null ? null : RestStatus.fromCode(failureStatusCode), checkpoint, checkpointStatus, requestsPerSecond);
}

private ReindexTaskStateDoc(ReindexRequest reindexRequest, boolean resilient, @Nullable Long allocationId,
@Nullable TaskId ephemeralTaskId, @Nullable BulkByScrollResponse reindexResponse,
@Nullable ElasticsearchException exception,
@Nullable ScrollableHitSource.Checkpoint checkpoint, float requestsPerSecond) {
@Nullable ScrollableHitSource.Checkpoint checkpoint, @Nullable BulkByScrollTask.Status checkpointStatus,
float requestsPerSecond) {
this(reindexRequest, resilient, allocationId, ephemeralTaskId,
reindexResponse, exception, exception != null ? exception.status() : null,
checkpoint, requestsPerSecond);
checkpoint, checkpointStatus, requestsPerSecond);
}

private ReindexTaskStateDoc(ReindexRequest reindexRequest, boolean resilient, @Nullable Long allocationId,
@Nullable TaskId ephemeralTaskId,
@Nullable BulkByScrollResponse reindexResponse, @Nullable ElasticsearchException exception,
@Nullable RestStatus failureStatusCode, @Nullable ScrollableHitSource.Checkpoint checkpoint,
@Nullable RestStatus failureStatusCode,
@Nullable ScrollableHitSource.Checkpoint checkpoint, @Nullable BulkByScrollTask.Status checkpointStatus,
float requestsPerSecond) {
this.reindexRequest = reindexRequest;
this.resilient = resilient;
Expand All @@ -109,10 +117,12 @@ private ReindexTaskStateDoc(ReindexRequest reindexRequest, boolean resilient, @N
assert Float.isNaN(requestsPerSecond) == false && requestsPerSecond >= 0;
this.requestsPerSecond = requestsPerSecond;
assert (reindexResponse == null) || (exception == null) : "Either response or exception must be null";
assert checkpoint != null || checkpointStatus == null : "Can only have checkpointStatus if checkpoint is set";
this.reindexResponse = reindexResponse;
this.exception = exception;
this.failureStatusCode = failureStatusCode;
this.checkpoint = checkpoint;
this.checkpointStatus = checkpointStatus;
}

@Override
Expand Down Expand Up @@ -143,6 +153,10 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
if (checkpoint != null) {
builder.field(REINDEX_CHECKPOINT);
checkpoint.toXContent(builder, params);
if (checkpointStatus != null) {
builder.field(REINDEX_CHECKPOINT_STATUS);
checkpointStatus.toXContent(builder, params);
}
}
builder.field(REQUESTS_PER_SECOND, requestsPerSecond);
return builder.endObject();
Expand Down Expand Up @@ -188,6 +202,10 @@ public ScrollableHitSource.Checkpoint getCheckpoint() {
return checkpoint;
}

public BulkByScrollTask.Status getCheckpointStatus() {
return checkpointStatus;
}

public Long getAllocationId() {
return allocationId;
}
Expand All @@ -203,22 +221,22 @@ public float getRequestsPerSecond() {
public ReindexTaskStateDoc withCheckpoint(ScrollableHitSource.Checkpoint checkpoint, BulkByScrollTask.Status status) {
// todo: also store and resume from status.
return new ReindexTaskStateDoc(reindexRequest, resilient, allocationId, ephemeralTaskId,
reindexResponse, exception, failureStatusCode, checkpoint, requestsPerSecond);
reindexResponse, exception, failureStatusCode, checkpoint, status, requestsPerSecond);
}

public ReindexTaskStateDoc withNewAllocation(long newAllocationId, TaskId ephemeralTaskId) {
return new ReindexTaskStateDoc(reindexRequest, resilient, newAllocationId, ephemeralTaskId,
reindexResponse, exception, failureStatusCode, checkpoint, requestsPerSecond);
reindexResponse, exception, failureStatusCode, checkpoint, checkpointStatus, requestsPerSecond);
}

public ReindexTaskStateDoc withFinishedState(@Nullable BulkByScrollResponse reindexResponse,
@Nullable ElasticsearchException exception) {
return new ReindexTaskStateDoc(reindexRequest, resilient, allocationId, ephemeralTaskId, reindexResponse, exception, checkpoint,
requestsPerSecond);
return new ReindexTaskStateDoc(reindexRequest, resilient, allocationId, ephemeralTaskId, reindexResponse, exception,
checkpoint, checkpointStatus, requestsPerSecond);
}

public ReindexTaskStateDoc withRequestsPerSecond(float requestsPerSecond) {
return new ReindexTaskStateDoc(reindexRequest, resilient, allocationId, ephemeralTaskId, reindexResponse, exception, checkpoint,
requestsPerSecond);
return new ReindexTaskStateDoc(reindexRequest, resilient, allocationId, ephemeralTaskId, reindexResponse, exception,
checkpoint, checkpointStatus, requestsPerSecond);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -92,12 +92,11 @@ public class Reindexer {
this.reindexSslConfig = reindexSslConfig;
}

public void initTask(BulkByScrollTask task, ReindexRequest request,
public void initTask(BulkByScrollTask task, ReindexRequest request, BulkByScrollTask.Status checkpointStatus,
ActionListener<Void> listener) {
BulkByScrollParallelizationHelper.initTaskState(task, request, client, listener);
BulkByScrollParallelizationHelper.initTaskState(task, request, checkpointStatus, client, listener);
}

// todo: this may need a way to relay back that it failed and this reindex instance should stand down?
public interface CheckpointListener {
void onCheckpoint(ScrollableHitSource.Checkpoint checkpoint, BulkByScrollTask.Status status);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,10 @@ public void writeTo(StreamOutput out) throws IOException {
out.writeOptionalWriteable(reindexResponse);
}

public String getPersistentTaskId() {
return persistentTaskId;
}

public String getEphemeralTaskId() {
return ephemeralTaskId;
}
Expand All @@ -150,10 +154,6 @@ public BulkByScrollResponse getReindexResponse() {
return reindexResponse;
}

public String getPersistentTaskId() {
return persistentTaskId;
}

public static Response fromXContent(final XContentParser parser) throws IOException {
return PARSER.parse(parser, null);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ public TransportReindexAction(Settings settings, ThreadPool threadPool, ActionFi
protected void doExecute(Task task, ReindexRequest request, ActionListener<BulkByScrollResponse> listener) {
reindexValidator.initialValidation(request);
BulkByScrollTask bulkByScrollTask = (BulkByScrollTask) task;
reindexer.initTask(bulkByScrollTask, request, new ActionListener<>() {
reindexer.initTask(bulkByScrollTask, request, null, new ActionListener<>() {
@Override
public void onResponse(Void v) {
// the transport action is never resilient, neither when directly invoked due to "es.reindex.resilience" system
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,7 @@ public void setupForTest() {
scrollId = null;
taskManager = new TaskManager(Settings.EMPTY, threadPool, Collections.emptySet());
testTask = (BulkByScrollTask) taskManager.register("don'tcare", "hereeither", testRequest);
testTask.setWorker(testRequest.getRequestsPerSecond(), null);
testTask.setWorker(testRequest.getRequestsPerSecond(), null, null);
worker = testTask.getWorkerState();

localNode = new DiscoveryNode("thenode", buildNewFakeTransportAddress(), emptyMap(), emptySet(), Version.CURRENT);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,12 @@ public void testReindexFailover() throws Throwable {

assertBusy(MockSearchService::assertNoInFlightContext, 10 + scrollTimeout, TimeUnit.SECONDS);

@SuppressWarnings("unchecked")
Map<String, Object> dotReindexResponse =
(Map<String, Object>) client().prepareGet(".reindex", response.getPersistentTaskId()).get().getSource().get("response");
assertThat((int) dotReindexResponse.get("created") + (int) dotReindexResponse.get("updated"),
greaterThanOrEqualTo(docCount));

// TODO: Add mechanism to wait for reindex task to complete
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -150,8 +150,6 @@ public Settings onNodeStopped(String nodeName) throws Exception {

Map<String, Object> reindexResponse = client().admin().cluster().prepareGetTask(taskId).setWaitForCompletion(true)
.get(TimeValue.timeValueSeconds(30)).getTask().getResponseAsMap();
// todo: this assert fails sometimes due to missing retry on transport closed
// assertThat(bulkByScrollResponse.getBulkFailures(), Matchers.empty());
assertEquals(Collections.emptyList(), reindexResponse.get("failures"));

assertSameDocs(numberOfDocuments, "test", "dest");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,11 +63,11 @@
*
* When the request is sliced, this task can either represent a coordinating task (using
* {@link BulkByScrollTask#setWorkerCount(int)}) or a worker task that performs search queries (using
* {@link BulkByScrollTask#setWorker(float, Integer)}).
* {@link BulkByScrollTask#setWorker(float, Integer, Status)}).
*
* We don't always know if this task will be a leader or worker task when it's created, because if slices is set to "auto" it may
* be either depending on the number of shards in the source indices. We figure that out when the request is handled and set it on this
* class with {@link #setWorkerCount(int)} or {@link #setWorker(float, Integer)}.
* class with {@link #setWorkerCount(int)} or {@link #setWorker(float, Integer, Status)}.
*/
public class BulkByScrollTask extends CancellableTask {

Expand Down Expand Up @@ -157,16 +157,17 @@ public boolean isWorker() {
* Sets this task to be a worker task that performs search requests
* @param requestsPerSecond How many search requests per second this task should make
* @param sliceId If this is is a sliced task, which slice number this task corresponds to. Null if not sliced.
* @param checkpointStatus the status to resume the worker from or null if this task does not resume from previous state
*/
public void setWorker(float requestsPerSecond, @Nullable Integer sliceId) {
public void setWorker(float requestsPerSecond, @Nullable Integer sliceId, Status checkpointStatus) {
if (isWorker()) {
throw new IllegalStateException("This task is already a worker");
}
if (isLeader()) {
throw new IllegalStateException("This task is already a leader for other slice subtasks");
}

workerState = new WorkerBulkByScrollTaskState(this, sliceId, requestsPerSecond);
workerState = new WorkerBulkByScrollTaskState(this, sliceId, requestsPerSecond, checkpointStatus);
if (isCancelled()) {
workerState.handleCancel();
}
Expand Down Expand Up @@ -632,7 +633,7 @@ public XContentBuilder innerXContent(XContentBuilder builder, Params params)
public static Status fromXContent(XContentParser parser) throws IOException {
XContentParser.Token token;
if (parser.currentToken() == Token.START_OBJECT) {
token = parser.nextToken();
token = parser.currentToken();
} else {
token = parser.nextToken();
}
Expand Down
Loading