-
Notifications
You must be signed in to change notification settings - Fork 25.2k
Store reindexing result in reindex index #45260
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Store reindexing result in reindex index #45260
Conversation
Pinging @elastic/es-distributed |
@ywelsch @henningandersen - This work has exposed a few issues. Currently Any thought how I should go about fixing this? |
I could serialize the response as a raw byte field using the internal transport serialization? Or I could add the status codes as a field in the x content? |
We discussed this yesterday and decided to go with adding the |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM.
Thanks @tbrooks8. I added a few smaller comments and then a couple to handle in follow-ups.
assert (reindexResponse == null) || (jobException == null) : "Either response or exception must be null"; | ||
this.reindexResponse = reindexResponse; | ||
this.jobException = jobException; | ||
this.status = status; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we assert that status is != null? Mostly to ensure that the isDone() method cannot return true on null.
}); | ||
} | ||
|
||
public void createReindexTaskDoc(String taskId, ReindexTaskIndexState reindexState, boolean indexExists, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it would be nice to make this method private and instead add a method that does not have the indexExists
flag. The ReindexClient
should then also receive the ClusterService in its constructor, enabling it to make the check on whether the index exists itself.
updateClusterStateToFailed(shouldStoreResult, ReindexJobState.Status.FAILED_TO_WRITE_TO_REINDEX_INDEX, ex); | ||
} | ||
}); | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: superfluous newline.
TaskManager taskManager = getTaskManager(); | ||
assert taskManager != null : "TaskManager should have been set before reindex started"; | ||
|
||
updatePersistentTaskState(new ReindexJobState(taskId, null, wrapException(ex)), new ActionListener<>() { | ||
ReindexTaskIndexState reindexState = new ReindexTaskIndexState(reindexRequest, response, null); | ||
reindexIndexClient.updateReindexTaskDoc(getPersistentTaskId(), reindexState, new ActionListener<>() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We now store the result in to two places (3 if we include the .tasks index), both the index and persistent cluster state. I think we risk storing the index state and then dying - and waking up to not be able to read the index. We would then have the index indicate success and the task indicate failure. Also, we risk someone seeing that it succeeded in the index and then afterwards it failed (this I think we cannot guarantee 100% but can likely handle better). I think resolving these things are definitely outside the scope of this PR, but wanted to mention it here for awareness.
} | ||
|
||
private void sendStartedNotification(boolean shouldStoreResult, Runnable listener) { | ||
updatePersistentTaskState(new ReindexJobState(taskId, null, null), new ActionListener<>() { | ||
private void updateClusterStateToStarted(boolean shouldStoreResult, Runnable listener) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wonder if this name is the best name? I think the status would already be STARTED? Or is there a null status initially? I lean towards preferring the old sendStartedNotification
name.
} else { | ||
listener.onFailure(reindexState.getException()); | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we then delete the persistent task, given that we responded and everything is done? Probably to be done in a follow-up rather than in this PR.
@@ -128,11 +115,30 @@ private void waitForReindexDone(String taskId, ActionListener<StartReindexJobAct | |||
@Override | |||
public void onResponse(PersistentTasksCustomMetaData.PersistentTask<ReindexJob> task) { | |||
ReindexJobState state = (ReindexJobState) task.getState(); | |||
if (state.getJobException() == null) { | |||
listener.onResponse(new StartReindexJobAction.Response(taskId, state.getReindexResponse())); | |||
if (state.getStatus() == ReindexJobState.Status.FAILED_TO_READ_FROM_REINDEX_INDEX) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also for a follow-up: it could be legitimate that we cannot read from the reindex index, for instance in a full system restart situation. I wonder if we should add some kind of retries with backoff to handle this?
Currently the result of a reindex persistent task is propogated and
stored in the cluster state. This commit changes this so that only the
ephemeral task-id, headers, and reindex state is store in the cluster
state. Any result (exception or response) is stored in the reindex
index.
Relates to #42612.