Skip to content

Reindex resume from status #49255

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ static <Request extends AbstractBulkByScrollRequest<Request>> void startSlicedAc
Client client,
DiscoveryNode node,
Runnable workerAction) {
initTaskState(task, request, client, new ActionListener<>() {
initTaskState(task, request, null, client, new ActionListener<>() {
@Override
public void onResponse(Void aVoid) {
executeSlicedAction(task, request, action, listener, client, node, workerAction);
Expand Down Expand Up @@ -119,6 +119,7 @@ static <Request extends AbstractBulkByScrollRequest<Request>> void executeSliced
static <Request extends AbstractBulkByScrollRequest<Request>> void initTaskState(
BulkByScrollTask task,
Request request,
BulkByScrollTask.Status checkpointStatus,
Client client,
ActionListener<Void> listener) {
int configuredSlices = request.getSlices();
Expand All @@ -128,7 +129,7 @@ static <Request extends AbstractBulkByScrollRequest<Request>> void initTaskState
client.admin().cluster().searchShards(shardsRequest, new ActionListener<>() {
@Override
public void onResponse(ClusterSearchShardsResponse response) {
setWorkerCount(request, task, countSlicesBasedOnShards(response));
setWorkerCount(request, task, countSlicesBasedOnShards(response), checkpointStatus);
listener.onResponse(null);
}

Expand All @@ -138,21 +139,23 @@ public void onFailure(Exception e) {
}
});
} else {
setWorkerCount(request, task, configuredSlices);
setWorkerCount(request, task, configuredSlices, checkpointStatus);
listener.onResponse(null);
}
}

private static <Request extends AbstractBulkByScrollRequest<Request>> void setWorkerCount(
Request request,
BulkByScrollTask task,
int slices) {
int slices,
BulkByScrollTask.Status checkpointStatus) {
if (slices > 1) {
assert checkpointStatus == null : "slices are not resilient";
task.setWorkerCount(slices);
} else {
SliceBuilder sliceBuilder = request.getSearchRequest().source().slice();
Integer sliceId = sliceBuilder == null ? null : sliceBuilder.getId();
task.setWorker(request.getRequestsPerSecond(), sliceId);
task.setWorker(request.getRequestsPerSecond(), sliceId, checkpointStatus);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -161,10 +161,9 @@ public void onFailure(Exception e) {
public void onResponse(ReindexTaskStateDoc stateDoc) {
ReindexRequest reindexRequest = stateDoc.getReindexRequest();
description = reindexRequest.getDescription();
reindexer.initTask(childTask, reindexRequest, new ActionListener<>() {
reindexer.initTask(childTask, reindexRequest, stateDoc.getCheckpointStatus(), new ActionListener<>() {
@Override
public void onResponse(Void aVoid) {
// TODO: need to store status in state so we can continue from it.
transientStatus = childTask.getStatus();
performReindex(reindexJob, stateDoc, taskUpdater);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,14 +34,16 @@ public class ReindexTaskStateDoc implements ToXContentObject {

public static final ConstructingObjectParser<ReindexTaskStateDoc, Void> PARSER =
new ConstructingObjectParser<>("reindex/index_state", a -> new ReindexTaskStateDoc((ReindexRequest) a[0], (Long) a[1],
(BulkByScrollResponse) a[2], (ElasticsearchException) a[3], (Integer) a[4], (ScrollableHitSource.Checkpoint) a[5]));
(BulkByScrollResponse) a[2], (ElasticsearchException) a[3], (Integer) a[4],
(ScrollableHitSource.Checkpoint) a[5], (BulkByScrollTask.Status) a[6]));

private static final String REINDEX_REQUEST = "request";
private static final String ALLOCATION = "allocation";
private static final String REINDEX_RESPONSE = "response";
private static final String REINDEX_EXCEPTION = "exception";
private static final String FAILURE_REST_STATUS = "failure_rest_status";
private static final String REINDEX_CHECKPOINT = "checkpoint";
private static final String REINDEX_CHECKPOINT_STATUS = "checkpoint_status";

static {
PARSER.declareObject(ConstructingObjectParser.constructorArg(), (p, c) -> ReindexRequest.fromXContentWithParams(p),
Expand All @@ -54,6 +56,8 @@ public class ReindexTaskStateDoc implements ToXContentObject {
PARSER.declareInt(ConstructingObjectParser.optionalConstructorArg(), new ParseField(FAILURE_REST_STATUS));
PARSER.declareObject(ConstructingObjectParser.optionalConstructorArg(), (p, c) -> ScrollableHitSource.Checkpoint.fromXContent(p),
new ParseField(REINDEX_CHECKPOINT));
PARSER.declareObject(ConstructingObjectParser.optionalConstructorArg(), (p, c) -> BulkByScrollTask.Status.fromXContent(p),
new ParseField(REINDEX_CHECKPOINT_STATUS));
}

private final ReindexRequest reindexRequest;
Expand All @@ -62,34 +66,40 @@ public class ReindexTaskStateDoc implements ToXContentObject {
private final ElasticsearchException exception;
private final RestStatus failureStatusCode;
private final ScrollableHitSource.Checkpoint checkpoint;
private final BulkByScrollTask.Status checkpointStatus;

public ReindexTaskStateDoc(ReindexRequest reindexRequest) {
this(reindexRequest, null, null, null, (RestStatus) null, null);
this(reindexRequest, null, null, null, (RestStatus) null, null, null);
}

public ReindexTaskStateDoc(ReindexRequest reindexRequest, @Nullable Long allocationId,
@Nullable BulkByScrollResponse reindexResponse, @Nullable ElasticsearchException exception,
@Nullable Integer failureStatusCode, ScrollableHitSource.Checkpoint checkpoint) {
@Nullable Integer failureStatusCode,
@Nullable ScrollableHitSource.Checkpoint checkpoint, @Nullable BulkByScrollTask.Status checkpointStatus) {
this(reindexRequest, allocationId, reindexResponse, exception,
failureStatusCode == null ? null : RestStatus.fromCode(failureStatusCode), checkpoint);
failureStatusCode == null ? null : RestStatus.fromCode(failureStatusCode), checkpoint, checkpointStatus);
}

public ReindexTaskStateDoc(ReindexRequest reindexRequest, @Nullable Long allocationId,
@Nullable BulkByScrollResponse reindexResponse, @Nullable ElasticsearchException exception,
@Nullable ScrollableHitSource.Checkpoint checkpoint) {
this(reindexRequest, allocationId, reindexResponse, exception, exception != null ? exception.status() : null, checkpoint);
@Nullable ScrollableHitSource.Checkpoint checkpoint, @Nullable BulkByScrollTask.Status checkpointStatus) {
this(reindexRequest, allocationId, reindexResponse, exception, exception != null ? exception.status() : null,
checkpoint, checkpointStatus);
}

private ReindexTaskStateDoc(ReindexRequest reindexRequest, @Nullable Long allocationId,
@Nullable BulkByScrollResponse reindexResponse, @Nullable ElasticsearchException exception,
@Nullable RestStatus failureStatusCode, @Nullable ScrollableHitSource.Checkpoint checkpoint) {
this.allocationId = allocationId;
@Nullable RestStatus failureStatusCode,
@Nullable ScrollableHitSource.Checkpoint checkpoint, @Nullable BulkByScrollTask.Status checkpointStatus) {
assert (reindexResponse == null) || (exception == null) : "Either response or exception must be null";
assert checkpoint != null || checkpointStatus == null : "Can only have checkpointStatus if checkpoint is set";
this.allocationId = allocationId;
this.reindexRequest = reindexRequest;
this.reindexResponse = reindexResponse;
this.exception = exception;
this.failureStatusCode = failureStatusCode;
this.checkpoint = checkpoint;
this.checkpointStatus = checkpointStatus;
}

@Override
Expand All @@ -116,6 +126,10 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
if (checkpoint != null) {
builder.field(REINDEX_CHECKPOINT);
checkpoint.toXContent(builder, params);
if (checkpointStatus != null) {
builder.field(REINDEX_CHECKPOINT_STATUS);
checkpointStatus.toXContent(builder, params);
}
}
return builder.endObject();
}
Expand Down Expand Up @@ -144,21 +158,25 @@ public ScrollableHitSource.Checkpoint getCheckpoint() {
return checkpoint;
}

public BulkByScrollTask.Status getCheckpointStatus() {
return checkpointStatus;
}

public Long getAllocationId() {
return allocationId;
}

public ReindexTaskStateDoc withCheckpoint(ScrollableHitSource.Checkpoint checkpoint, BulkByScrollTask.Status status) {
// todo: also store and resume from status.
return new ReindexTaskStateDoc(reindexRequest, allocationId, reindexResponse, exception, failureStatusCode, checkpoint);
return new ReindexTaskStateDoc(reindexRequest, allocationId, reindexResponse, exception, failureStatusCode, checkpoint, status);
}

public ReindexTaskStateDoc withNewAllocation(long newAllocationId) {
return new ReindexTaskStateDoc(reindexRequest, newAllocationId, reindexResponse, exception, failureStatusCode, checkpoint);
return new ReindexTaskStateDoc(reindexRequest, newAllocationId, reindexResponse, exception, failureStatusCode,
checkpoint, checkpointStatus);
}

public ReindexTaskStateDoc withFinishedState(@Nullable BulkByScrollResponse reindexResponse,
@Nullable ElasticsearchException exception) {
return new ReindexTaskStateDoc(reindexRequest, allocationId, reindexResponse, exception, checkpoint);
return new ReindexTaskStateDoc(reindexRequest, allocationId, reindexResponse, exception, checkpoint, checkpointStatus);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -92,11 +92,11 @@ public class Reindexer {
this.reindexSslConfig = reindexSslConfig;
}

public void initTask(BulkByScrollTask task, ReindexRequest request, ActionListener<Void> listener) {
BulkByScrollParallelizationHelper.initTaskState(task, request, client, listener);
public void initTask(BulkByScrollTask task, ReindexRequest request, BulkByScrollTask.Status checkpointStatus,
ActionListener<Void> listener) {
BulkByScrollParallelizationHelper.initTaskState(task, request, checkpointStatus, client, listener);
}

// todo: this may need a way to relay back that it failed and this reindex instance should stand down?
public interface CheckpointListener {
void onCheckpoint(ScrollableHitSource.Checkpoint checkpoint, BulkByScrollTask.Status status);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,45 +95,55 @@ public boolean getWaitForCompletion() {
public static class Response extends ActionResponse {

static final ParseField TASK_ID = new ParseField("task_id");
static final ParseField PERSISTENT_TASK_ID = new ParseField("persistent_task_id");
static final ParseField REINDEX_RESPONSE = new ParseField("reindex_response");

private static final ConstructingObjectParser<Response, Void> PARSER = new ConstructingObjectParser<>(
"start_reindex_response", true, args -> new Response((String) args[0], (BulkByScrollResponse) args[1]));
"start_reindex_response", true, args -> new Response((String) args[0], (String) args[1], (BulkByScrollResponse) args[2]));

static {
PARSER.declareString(ConstructingObjectParser.constructorArg(), TASK_ID);
PARSER.declareString(ConstructingObjectParser.constructorArg(), PERSISTENT_TASK_ID);
PARSER.declareObject(ConstructingObjectParser.optionalConstructorArg(),
(parser, context) -> BulkByScrollResponse.fromXContent(parser), REINDEX_RESPONSE);
}

private final String taskId;
private final String persistentTaskId;
@Nullable private final BulkByScrollResponse reindexResponse;

public Response(String taskId) {
this(taskId, null);
public Response(String taskId, String persistentTaskId) {
this(taskId, persistentTaskId, null);
}

public Response(String taskId, BulkByScrollResponse reindexResponse) {
public Response(String taskId, String persistentTaskId, BulkByScrollResponse reindexResponse) {
this.taskId = taskId;
this.persistentTaskId = persistentTaskId;
this.reindexResponse = reindexResponse;
}

public Response(StreamInput in) throws IOException {
super(in);
taskId = in.readString();
persistentTaskId = in.readString();
reindexResponse = in.readOptionalWriteable(BulkByScrollResponse::new);
}

@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeString(taskId);
out.writeString(persistentTaskId);
out.writeOptionalWriteable(reindexResponse);
}

public String getTaskId() {
return taskId;
}

public String getPersistentTaskId() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was just working on this on a different branch and I feel like the persistent task id should be the default naming? As in the methods are getTaskId or getReindexTaskId? And the other one be getEphemeralTaskId? Or getCoordinatorNodeTaskId? Or getInitialEphemeralTaskId?

return persistentTaskId;
}

public BulkByScrollResponse getReindexResponse() {
return reindexResponse;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ public TransportReindexAction(Settings settings, ThreadPool threadPool, ActionFi
protected void doExecute(Task task, ReindexRequest request, ActionListener<BulkByScrollResponse> listener) {
reindexValidator.initialValidation(request);
BulkByScrollTask bulkByScrollTask = (BulkByScrollTask) task;
reindexer.initTask(bulkByScrollTask, request, new ActionListener<>() {
reindexer.initTask(bulkByScrollTask, request, null, new ActionListener<>() {
@Override
public void onResponse(Void v) {
// the transport action is never resilient, neither when directly invoked due to "es.reindex.resilience" system
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,8 @@ public void onResponse(PersistentTasksCustomMetaData.PersistentTask<ReindexJob>
public void onResponse(ReindexTaskState taskState) {
ReindexTaskStateDoc reindexState = taskState.getStateDoc();
if (reindexState.getException() == null) {
listener.onResponse(new StartReindexJobAction.Response(taskId, reindexState.getReindexResponse()));
listener.onResponse(new StartReindexJobAction.Response(null, taskId,
reindexState.getReindexResponse()));
} else {
Exception exception = reindexState.getException();
RestStatus statusCode = reindexState.getFailureStatusCode();
Expand Down Expand Up @@ -153,7 +154,7 @@ private void waitForReindexTask(String taskId, ActionListener<StartReindexJobAct
@Override
public void onResponse(PersistentTasksCustomMetaData.PersistentTask<ReindexJob> task) {
ReindexJobState state = (ReindexJobState) task.getState();
listener.onResponse(new StartReindexJobAction.Response(state.getEphemeralTaskId().toString()));
listener.onResponse(new StartReindexJobAction.Response(state.getEphemeralTaskId().toString(), taskId));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,7 @@ public void setupForTest() {
scrollId = null;
taskManager = new TaskManager(Settings.EMPTY, threadPool, Collections.emptySet());
testTask = (BulkByScrollTask) taskManager.register("don'tcare", "hereeither", testRequest);
testTask.setWorker(testRequest.getRequestsPerSecond(), null);
testTask.setWorker(testRequest.getRequestsPerSecond(), null, null);
worker = testTask.getWorkerState();

localNode = new DiscoveryNode("thenode", buildNewFakeTransportAddress(), emptyMap(), emptySet(), Version.CURRENT);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,12 @@ public void testReindexFailover() throws Throwable {

assertBusy(MockSearchService::assertNoInFlightContext, 10 + scrollTimeout, TimeUnit.SECONDS);

@SuppressWarnings("unchecked")
Map<String, Object> dotReindexResponse =
(Map<String, Object>) client().prepareGet(".reindex", response.getPersistentTaskId()).get().getSource().get("response");
assertThat((int) dotReindexResponse.get("created") + (int) dotReindexResponse.get("updated"),
greaterThanOrEqualTo(docCount));

// TODO: Add mechanism to wait for reindex task to complete
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -143,8 +143,6 @@ public void testDataNodeRestart() throws Exception {

Map<String, Object> reindexResponse = client().admin().cluster().prepareGetTask(taskId).setWaitForCompletion(true)
.get(TimeValue.timeValueSeconds(30)).getTask().getResponseAsMap();
// todo: this assert fails sometimes due to missing retry on transport closed
// assertThat(bulkByScrollResponse.getBulkFailures(), Matchers.empty());
assertEquals(Collections.emptyList(), reindexResponse.get("failures"));

assertSameDocs(numberOfDocuments, "test", "dest");
Expand Down
Loading