diff --git a/modules/reindex/src/main/java/org/elasticsearch/index/reindex/ReindexJobState.java b/modules/reindex/src/main/java/org/elasticsearch/index/reindex/ReindexJobState.java index 41196285c5782..b1e41dce04f6b 100644 --- a/modules/reindex/src/main/java/org/elasticsearch/index/reindex/ReindexJobState.java +++ b/modules/reindex/src/main/java/org/elasticsearch/index/reindex/ReindexJobState.java @@ -19,8 +19,6 @@ package org.elasticsearch.index.reindex; -import org.elasticsearch.ElasticsearchException; -import org.elasticsearch.common.Nullable; import org.elasticsearch.common.ParseField; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; @@ -39,41 +37,32 @@ public class ReindexJobState implements Task.Status, PersistentTaskState { public static final String NAME = ReindexTask.NAME; public static final ConstructingObjectParser PARSER = - new ConstructingObjectParser<>(NAME, a -> new ReindexJobState((String) a[0], (BulkByScrollResponse) a[1], - (ElasticsearchException) a[2])); + new ConstructingObjectParser<>(NAME, a -> new ReindexJobState((String) a[0], (String) a[1])); private static String EPHEMERAL_TASK_ID = "ephemeral_task_id"; - private static String REINDEX_RESPONSE = "reindex_response"; - private static String REINDEX_EXCEPTION = "reindex_exception"; + private static String STATUS = "status"; static { PARSER.declareString(ConstructingObjectParser.constructorArg(), new ParseField(EPHEMERAL_TASK_ID)); - PARSER.declareObject(ConstructingObjectParser.optionalConstructorArg(), (p, c) -> BulkByScrollResponse.fromXContent(p), - new ParseField(REINDEX_RESPONSE)); - PARSER.declareObject(ConstructingObjectParser.optionalConstructorArg(), (p, c) -> ElasticsearchException.fromXContent(p), - new ParseField(REINDEX_EXCEPTION)); + PARSER.declareString(ConstructingObjectParser.constructorArg(), new ParseField(STATUS)); } private final TaskId ephemeralTaskId; - private final BulkByScrollResponse reindexResponse; - private final ElasticsearchException jobException; + private final Status status; - private ReindexJobState(String ephemeralTaskId, BulkByScrollResponse reindexResponse, ElasticsearchException jobException) { - this(new TaskId(ephemeralTaskId), reindexResponse, jobException); + private ReindexJobState(String ephemeralTaskId, String status) { + this(new TaskId(ephemeralTaskId), Status.valueOf(status)); } - ReindexJobState(TaskId ephemeralTaskId, @Nullable BulkByScrollResponse reindexResponse, - @Nullable ElasticsearchException jobException) { + ReindexJobState(TaskId ephemeralTaskId, Status status) { + assert status != null : "Status cannot be null"; this.ephemeralTaskId = ephemeralTaskId; - assert (reindexResponse == null) || (jobException == null) : "Either response or exception must be null"; - this.reindexResponse = reindexResponse; - this.jobException = jobException; + this.status = status; } public ReindexJobState(StreamInput in) throws IOException { ephemeralTaskId = TaskId.readFromStream(in); - reindexResponse = in.readOptionalWriteable(BulkByScrollResponse::new); - jobException = in.readException(); + status = in.readEnum(Status.class); } @Override @@ -84,35 +73,23 @@ public String getWriteableName() { @Override public void writeTo(StreamOutput out) throws IOException { ephemeralTaskId.writeTo(out); - out.writeOptionalWriteable(reindexResponse); - out.writeException(jobException); + out.writeEnum(status); } @Override public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { builder.startObject(); builder.field(EPHEMERAL_TASK_ID, ephemeralTaskId.toString()); - if (reindexResponse != null) { - builder.field(REINDEX_RESPONSE); - builder.startObject(); - reindexResponse.toXContent(builder, params); - builder.endObject(); - } - if (jobException != null) { - builder.field(REINDEX_EXCEPTION); - builder.startObject(); - jobException.toXContent(builder, params); - builder.endObject(); - } + builder.field(STATUS, status); return builder.endObject(); } - public BulkByScrollResponse getReindexResponse() { - return reindexResponse; + public boolean isDone() { + return status != Status.STARTED; } - public ElasticsearchException getJobException() { - return jobException; + public Status getStatus() { + return status; } public TaskId getEphemeralTaskId() { @@ -122,4 +99,11 @@ public TaskId getEphemeralTaskId() { public static ReindexJobState fromXContent(XContentParser parser) { return PARSER.apply(parser, null); } + + public enum Status { + STARTED, + FAILED_TO_READ_FROM_REINDEX_INDEX, + FAILED_TO_WRITE_TO_REINDEX_INDEX, + DONE + } } diff --git a/modules/reindex/src/main/java/org/elasticsearch/index/reindex/ReindexTask.java b/modules/reindex/src/main/java/org/elasticsearch/index/reindex/ReindexTask.java index 63b644aa4e89a..ac635c152f182 100644 --- a/modules/reindex/src/main/java/org/elasticsearch/index/reindex/ReindexTask.java +++ b/modules/reindex/src/main/java/org/elasticsearch/index/reindex/ReindexTask.java @@ -23,31 +23,23 @@ import org.apache.logging.log4j.Logger; import org.elasticsearch.ElasticsearchException; import org.elasticsearch.action.ActionListener; -import org.elasticsearch.action.get.GetRequest; -import org.elasticsearch.action.get.GetResponse; import org.elasticsearch.action.support.ContextPreservingActionListener; import org.elasticsearch.client.Client; -import org.elasticsearch.client.OriginSettingClient; import org.elasticsearch.client.node.NodeClient; import org.elasticsearch.cluster.service.ClusterService; -import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.util.concurrent.ThreadContext; -import org.elasticsearch.common.xcontent.LoggingDeprecationHandler; import org.elasticsearch.common.xcontent.NamedXContentRegistry; -import org.elasticsearch.common.xcontent.XContentHelper; -import org.elasticsearch.common.xcontent.XContentParser; -import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.persistent.AllocatedPersistentTask; import org.elasticsearch.persistent.PersistentTaskState; import org.elasticsearch.persistent.PersistentTasksCustomMetaData; import org.elasticsearch.persistent.PersistentTasksExecutor; +import org.elasticsearch.rest.RestStatus; import org.elasticsearch.script.ScriptService; import org.elasticsearch.tasks.Task; import org.elasticsearch.tasks.TaskId; import org.elasticsearch.tasks.TaskManager; import org.elasticsearch.threadpool.ThreadPool; -import java.io.IOException; import java.util.HashMap; import java.util.Map; import java.util.function.Supplier; @@ -58,14 +50,11 @@ public class ReindexTask extends AllocatedPersistentTask { // TODO: Name public static final String NAME = "reindex/job"; - // TODO: Eventually this should be an alias for index versioning - public static final String REINDEX_INDEX = ".reindex"; private final NodeClient client; - private final Client taskClient; + private final ReindexIndexClient reindexIndexClient; private final Reindexer reindexer; private final TaskId taskId; - private final NamedXContentRegistry xContentRegistry; private final BulkByScrollTask childTask; public static class ReindexPersistentTasksExecutor extends PersistentTasksExecutor { @@ -77,8 +66,8 @@ public static class ReindexPersistentTasksExecutor extends PersistentTasksExecut private final ReindexSslConfig reindexSslConfig; private final NamedXContentRegistry xContentRegistry; - public ReindexPersistentTasksExecutor(ClusterService clusterService, Client client, NamedXContentRegistry xContentRegistry, - ThreadPool threadPool, ScriptService scriptService, ReindexSslConfig reindexSslConfig) { + ReindexPersistentTasksExecutor(ClusterService clusterService, Client client, NamedXContentRegistry xContentRegistry, + ThreadPool threadPool, ScriptService scriptService, ReindexSslConfig reindexSslConfig) { super(NAME, ThreadPool.Names.GENERIC); this.clusterService = clusterService; this.client = client; @@ -108,9 +97,8 @@ private ReindexTask(long id, String type, String action, TaskId parentTask, Map< ClusterService clusterService, NamedXContentRegistry xContentRegistry, Client client, Reindexer reindexer) { // TODO: description super(id, type, action, "persistent reindex", parentTask, headers); - this.xContentRegistry = xContentRegistry; this.client = (NodeClient) client; - this.taskClient = new OriginSettingClient(client, ReindexTaskIndexState.REINDEX_ORIGIN); + this.reindexIndexClient = new ReindexIndexClient(client, clusterService, xContentRegistry); this.reindexer = reindexer; this.taskId = new TaskId(clusterService.localNode().getId(), id); this.childTask = new BulkByScrollTask(id, type, action, getDescription(), parentTask, headers); @@ -126,14 +114,11 @@ BulkByScrollTask getChildTask() { } private void execute(ReindexJob reindexJob) { - ThreadContext threadContext = client.threadPool().getThreadContext(); - Supplier context = threadContext.newRestorableContext(false); - - GetRequest getRequest = new GetRequest(REINDEX_INDEX).id(getPersistentTaskId()); - taskClient.get(getRequest, ActionListener.map(new ActionListener<>() { + reindexIndexClient.getReindexTaskDoc(getPersistentTaskId(), new ActionListener<>() { @Override - public void onResponse(ReindexRequest reindexRequest) { - Runnable performReindex = () -> performReindex(reindexJob, reindexRequest, context); + public void onResponse(ReindexTaskIndexState reindexTaskIndexState) { + ReindexRequest reindexRequest = reindexTaskIndexState.getReindexRequest(); + Runnable performReindex = () -> performReindex(reindexJob, reindexRequest); reindexer.initTask(childTask, reindexRequest, new ActionListener<>() { @Override public void onResponse(Void aVoid) { @@ -142,32 +127,21 @@ public void onResponse(Void aVoid) { @Override public void onFailure(Exception e) { - handleError(reindexJob.shouldStoreResult(), e); + handleError(reindexJob.shouldStoreResult(), reindexRequest, e); } }); } @Override - public void onFailure(Exception e) { - handleError(reindexJob.shouldStoreResult(), e); - + public void onFailure(Exception ex) { + logger.info("Failed to fetch reindex task doc", ex); + updateClusterStateToFailed(reindexJob.shouldStoreResult(), ReindexJobState.Status.FAILED_TO_READ_FROM_REINDEX_INDEX, ex); } - }, this::extractReindexRequest)); - } - - private ReindexRequest extractReindexRequest(GetResponse response) throws IOException { - BytesReference source = response.getSourceAsBytesRef(); - try (XContentParser parser = XContentHelper.createParser(xContentRegistry, LoggingDeprecationHandler.INSTANCE, source, - XContentType.JSON)) { - ReindexTaskIndexState taskState = ReindexTaskIndexState.fromXContent(parser); - ReindexRequest reindexRequest = taskState.getReindexRequest(); - reindexRequest.setParentTask(taskId); - return reindexRequest; - } + }); } private void sendStartedNotification(boolean shouldStoreResult, Runnable listener) { - updatePersistentTaskState(new ReindexJobState(taskId, null, null), new ActionListener<>() { + updatePersistentTaskState(new ReindexJobState(taskId, ReindexJobState.Status.STARTED), new ActionListener<>() { @Override public void onResponse(PersistentTasksCustomMetaData.PersistentTask persistentTask) { listener.run(); @@ -175,82 +149,119 @@ public void onResponse(PersistentTasksCustomMetaData.PersistentTask persisten @Override public void onFailure(Exception e) { - logger.info("Failed to update reindex persistent task with ephemeral id", e); - TaskManager taskManager = getTaskManager(); - assert taskManager != null : "TaskManager should have been set before reindex started"; - markEphemeralTaskFailed(shouldStoreResult, taskManager, e); + logger.info("Failed to update task in cluster state to started", e); + markEphemeralTaskFailed(shouldStoreResult, e); } }); } - private void performReindex(ReindexJob reindexJob, ReindexRequest reindexRequest, Supplier context) { - TaskManager taskManager = getTaskManager(); - assert taskManager != null : "TaskManager should have been set before reindex started"; - + private void performReindex(ReindexJob reindexJob, ReindexRequest reindexRequest) { ThreadContext threadContext = client.threadPool().getThreadContext(); + boolean shouldStoreResult = reindexJob.shouldStoreResult(); + Supplier context = threadContext.newRestorableContext(false); // TODO: Eventually we only want to retain security context try (ThreadContext.StoredContext ignore = stashWithHeaders(threadContext, reindexJob.getHeaders())) { reindexer.execute(childTask, reindexRequest, new ContextPreservingActionListener<>(context, new ActionListener<>() { @Override public void onResponse(BulkByScrollResponse response) { - updatePersistentTaskState(new ReindexJobState(taskId, response, null), new ActionListener<>() { - @Override - public void onResponse(PersistentTasksCustomMetaData.PersistentTask persistentTask) { - if (shouldStoreResult) { - taskManager.storeResult(ReindexTask.this, response, new ActionListener<>() { - @Override - public void onResponse(BulkByScrollResponse response) { - markAsCompleted(); - } - - @Override - public void onFailure(Exception e) { - logger.info("Failed to store task result", e); - markAsFailed(e); - } - }); - } else { - markAsCompleted(); - } - } - - @Override - public void onFailure(Exception e) { - logger.info("Failed to update task state to success", e); - markEphemeralTaskFailed(shouldStoreResult, taskManager, e); - } - }); + handleDone(shouldStoreResult, reindexRequest, response); } @Override public void onFailure(Exception ex) { - handleError(shouldStoreResult, ex); + handleError(shouldStoreResult, reindexRequest, ex); } })); } } - private void handleError(boolean shouldStoreResult, Exception ex) { + private void handleDone(boolean shouldStoreResult, ReindexRequest reindexRequest, BulkByScrollResponse response) { + TaskManager taskManager = getTaskManager(); + assert taskManager != null : "TaskManager should have been set before reindex started"; + + ReindexTaskIndexState reindexState = new ReindexTaskIndexState(reindexRequest, response, null, (RestStatus) null); + reindexIndexClient.updateReindexTaskDoc(getPersistentTaskId(), reindexState, new ActionListener<>() { + @Override + public void onResponse(Void v) { + updatePersistentTaskState(new ReindexJobState(taskId, ReindexJobState.Status.DONE), new ActionListener<>() { + @Override + public void onResponse(PersistentTasksCustomMetaData.PersistentTask persistentTask) { + if (shouldStoreResult) { + taskManager.storeResult(ReindexTask.this, response, new ActionListener<>() { + @Override + public void onResponse(BulkByScrollResponse response) { + markAsCompleted(); + } + + @Override + public void onFailure(Exception ex) { + logger.info("Failed to store task result", ex); + markAsFailed(ex); + } + }); + } else { + markAsCompleted(); + } + } + + @Override + public void onFailure(Exception ex) { + logger.info("Failed to update task in cluster state to success", ex); + markEphemeralTaskFailed(shouldStoreResult, ex); + } + }); + } + + @Override + public void onFailure(Exception ex) { + logger.info("Failed to write result to reindex index", ex); + updateClusterStateToFailed(shouldStoreResult, ReindexJobState.Status.FAILED_TO_WRITE_TO_REINDEX_INDEX, ex); + } + }); + } + + private void handleError(boolean shouldStoreResult, ReindexRequest reindexRequest, Exception ex) { TaskManager taskManager = getTaskManager(); assert taskManager != null : "TaskManager should have been set before reindex started"; - updatePersistentTaskState(new ReindexJobState(taskId, null, wrapException(ex)), new ActionListener<>() { + ElasticsearchException exception = wrapException(ex); + ReindexTaskIndexState reindexState = new ReindexTaskIndexState(reindexRequest, null, exception, exception.status()); + + reindexIndexClient.updateReindexTaskDoc(getPersistentTaskId(), reindexState, new ActionListener<>() { + @Override + public void onResponse(Void v) { + updateClusterStateToFailed(shouldStoreResult, ReindexJobState.Status.DONE, ex); + } + + @Override + public void onFailure(Exception e) { + logger.info("Failed to write exception reindex index", e); + ex.addSuppressed(e); + updateClusterStateToFailed(shouldStoreResult, ReindexJobState.Status.FAILED_TO_WRITE_TO_REINDEX_INDEX, ex); + } + }); + } + + private void updateClusterStateToFailed(boolean shouldStoreResult, ReindexJobState.Status status, Exception ex) { + updatePersistentTaskState(new ReindexJobState(taskId, status), new ActionListener<>() { @Override public void onResponse(PersistentTasksCustomMetaData.PersistentTask persistentTask) { - markEphemeralTaskFailed(shouldStoreResult, taskManager, ex); + markEphemeralTaskFailed(shouldStoreResult, ex); } @Override public void onFailure(Exception e) { - logger.info("Failed to update task state to failed", e); + logger.info("Failed to update task in cluster state to failed", e); ex.addSuppressed(e); - markEphemeralTaskFailed(shouldStoreResult, taskManager, ex); + markEphemeralTaskFailed(shouldStoreResult, ex); } }); } - private void markEphemeralTaskFailed(boolean shouldStoreResult, TaskManager taskManager, Exception ex) { + private void markEphemeralTaskFailed(boolean shouldStoreResult, Exception ex) { + TaskManager taskManager = getTaskManager(); + assert taskManager != null : "TaskManager should have been set before reindex started"; if (shouldStoreResult) { taskManager.storeResult(ReindexTask.this, ex, ActionListener.wrap(() -> markAsFailed(ex))); } else { diff --git a/modules/reindex/src/main/java/org/elasticsearch/index/reindex/TransportStartReindexJobAction.java b/modules/reindex/src/main/java/org/elasticsearch/index/reindex/TransportStartReindexJobAction.java index b0abbe2947b1a..7d3de2b139a40 100644 --- a/modules/reindex/src/main/java/org/elasticsearch/index/reindex/TransportStartReindexJobAction.java +++ b/modules/reindex/src/main/java/org/elasticsearch/index/reindex/TransportStartReindexJobAction.java @@ -19,60 +19,45 @@ package org.elasticsearch.index.reindex; import org.elasticsearch.ElasticsearchException; -import org.elasticsearch.ExceptionsHelper; +import org.elasticsearch.ElasticsearchStatusException; import org.elasticsearch.ResourceAlreadyExistsException; import org.elasticsearch.action.ActionListener; -import org.elasticsearch.action.DocWriteRequest; -import org.elasticsearch.action.admin.indices.create.CreateIndexRequest; -import org.elasticsearch.action.admin.indices.create.CreateIndexResponse; -import org.elasticsearch.action.index.IndexRequest; -import org.elasticsearch.action.index.IndexResponse; import org.elasticsearch.action.support.ActionFilters; -import org.elasticsearch.action.support.HandledTransportAction; import org.elasticsearch.action.support.AutoCreateIndex; +import org.elasticsearch.action.support.HandledTransportAction; import org.elasticsearch.client.Client; -import org.elasticsearch.client.OriginSettingClient; -import org.elasticsearch.cluster.ClusterState; -import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.UUIDs; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.common.xcontent.ToXContent; -import org.elasticsearch.common.xcontent.XContentBuilder; -import org.elasticsearch.common.xcontent.XContentFactory; -import org.elasticsearch.common.xcontent.XContentType; +import org.elasticsearch.common.xcontent.NamedXContentRegistry; import org.elasticsearch.persistent.PersistentTasksCustomMetaData; import org.elasticsearch.persistent.PersistentTasksService; +import org.elasticsearch.rest.RestStatus; import org.elasticsearch.tasks.Task; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportService; -import java.io.IOException; import java.util.function.Predicate; -import static org.elasticsearch.index.reindex.ReindexTaskIndexState.REINDEX_ORIGIN; - public class TransportStartReindexJobAction extends HandledTransportAction { private final ThreadPool threadPool; - private final ClusterService clusterService; private final PersistentTasksService persistentTasksService; private final ReindexValidator reindexValidator; - private final Client taskClient; + private final ReindexIndexClient reindexIndexClient; @Inject public TransportStartReindexJobAction(Settings settings, Client client, TransportService transportService, ThreadPool threadPool, ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver, ClusterService clusterService, PersistentTasksService persistentTasksService, - AutoCreateIndex autoCreateIndex) { + AutoCreateIndex autoCreateIndex, NamedXContentRegistry xContentRegistry) { super(StartReindexJobAction.NAME, transportService, actionFilters, StartReindexJobAction.Request::new); this.threadPool = threadPool; - this.clusterService = clusterService; this.reindexValidator = new ReindexValidator(settings, clusterService, indexNameExpressionResolver, autoCreateIndex); this.persistentTasksService = persistentTasksService; - this.taskClient = new OriginSettingClient(client, REINDEX_ORIGIN); + this.reindexIndexClient = new ReindexIndexClient(client, clusterService, xContentRegistry); } @Override @@ -89,10 +74,8 @@ protected void doExecute(Task task, StartReindexJobAction.Request request, Actio boolean storeTaskResult = request.getWaitForCompletion() == false; ReindexJob job = new ReindexJob(storeTaskResult, threadPool.getThreadContext().getHeaders()); - ClusterState clusterState = clusterService.state(); - boolean reindexIndexExists = clusterState.routingTable().hasIndex(ReindexTask.REINDEX_INDEX); - - createReindexTaskDoc(generatedId, request.getReindexRequest(), reindexIndexExists, new ActionListener<>() { + ReindexTaskIndexState reindexState = new ReindexTaskIndexState(request.getReindexRequest()); + reindexIndexClient.createReindexTaskDoc(generatedId, reindexState, new ActionListener<>() { @Override public void onResponse(Void v) { // TODO: Task name @@ -128,11 +111,32 @@ private void waitForReindexDone(String taskId, ActionListener task) { ReindexJobState state = (ReindexJobState) task.getState(); - if (state.getJobException() == null) { - listener.onResponse(new StartReindexJobAction.Response(taskId, state.getReindexResponse())); + if (state.getStatus() == ReindexJobState.Status.FAILED_TO_READ_FROM_REINDEX_INDEX) { + listener.onFailure(new ElasticsearchException("Reindexing failed. Task node could not read from " + + ReindexIndexClient.REINDEX_INDEX + " index")); + } else if (state.getStatus() == ReindexJobState.Status.FAILED_TO_WRITE_TO_REINDEX_INDEX) { + listener.onFailure(new ElasticsearchException("Reindexing failed. Task node could not write result to " + + ReindexIndexClient.REINDEX_INDEX + " index")); } else { - listener.onFailure(state.getJobException()); + reindexIndexClient.getReindexTaskDoc(taskId, new ActionListener<>() { + @Override + public void onResponse(ReindexTaskIndexState reindexState) { + if (reindexState.getException() == null) { + listener.onResponse(new StartReindexJobAction.Response(taskId, reindexState.getReindexResponse())); + } else { + Exception exception = reindexState.getException(); + RestStatus statusCode = reindexState.getFailureStatusCode(); + listener.onFailure(new ElasticsearchStatusException(exception.getMessage(), statusCode, exception)); + } + } + + @Override + public void onFailure(Exception e) { + listener.onFailure(e); + } + }); } + } @Override @@ -159,57 +163,6 @@ public void onFailure(Exception e) { }); } - private void createReindexTaskDoc(String taskId, ReindexRequest reindexRequest, boolean indexExists, ActionListener listener) { - if (indexExists) { - IndexRequest indexRequest = new IndexRequest(ReindexTask.REINDEX_INDEX).id(taskId).opType(DocWriteRequest.OpType.CREATE); - try (XContentBuilder builder = XContentFactory.contentBuilder(XContentType.JSON)) { - ReindexTaskIndexState reindexState = new ReindexTaskIndexState(reindexRequest); - reindexState.toXContent(builder, ToXContent.EMPTY_PARAMS); - indexRequest.source(builder); - } catch (IOException e) { - listener.onFailure(new ElasticsearchException("Couldn't serialize reindex request into XContent", e)); - } - taskClient.index(indexRequest, new ActionListener<>() { - @Override - public void onResponse(IndexResponse indexResponse) { - listener.onResponse(null); - } - - @Override - public void onFailure(Exception e) { - listener.onFailure(e); - } - }); - } else { - CreateIndexRequest createIndexRequest = new CreateIndexRequest(); - createIndexRequest.settings(reindexIndexSettings()); - createIndexRequest.index(ReindexTask.REINDEX_INDEX); - createIndexRequest.cause("auto(reindex api)"); - createIndexRequest.mapping("_doc", "{\"dynamic\": false}", XContentType.JSON); - - taskClient.admin().indices().create(createIndexRequest, new ActionListener<>() { - @Override - public void onResponse(CreateIndexResponse result) { - createReindexTaskDoc(taskId, reindexRequest, true, listener); - } - - @Override - public void onFailure(Exception e) { - if (ExceptionsHelper.unwrapCause(e) instanceof ResourceAlreadyExistsException) { - try { - createReindexTaskDoc(taskId, reindexRequest, true, listener); - } catch (Exception inner) { - inner.addSuppressed(e); - listener.onFailure(inner); - } - } else { - listener.onFailure(e); - } - } - }); - } - } - private static class ReindexPredicate implements Predicate> { private boolean waitForDone; @@ -243,16 +196,7 @@ private boolean isStarted(ReindexJobState state) { } private boolean isDone(ReindexJobState state) { - return state != null && (state.getReindexResponse() != null || state.getJobException() != null); + return state != null && state.isDone(); } } - - private Settings reindexIndexSettings() { - // TODO: Copied from task index - return Settings.builder() - .put(IndexMetaData.INDEX_NUMBER_OF_SHARDS_SETTING.getKey(), 1) - .put(IndexMetaData.INDEX_AUTO_EXPAND_REPLICAS_SETTING.getKey(), "0-1") - .put(IndexMetaData.SETTING_PRIORITY, Integer.MAX_VALUE) - .build(); - } } diff --git a/modules/reindex/src/test/java/org/elasticsearch/index/reindex/ReindexFailoverIT.java b/modules/reindex/src/test/java/org/elasticsearch/index/reindex/ReindexFailoverIT.java index cfb4269ae9c8d..cfa1366db3091 100644 --- a/modules/reindex/src/test/java/org/elasticsearch/index/reindex/ReindexFailoverIT.java +++ b/modules/reindex/src/test/java/org/elasticsearch/index/reindex/ReindexFailoverIT.java @@ -28,6 +28,7 @@ import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.plugins.Plugin; import org.elasticsearch.search.MockSearchService; +import org.elasticsearch.search.SearchService; import org.elasticsearch.tasks.TaskId; import org.elasticsearch.test.ESIntegTestCase; import org.elasticsearch.test.InternalTestCluster; @@ -46,6 +47,12 @@ @ESIntegTestCase.ClusterScope(scope = ESIntegTestCase.Scope.TEST, numDataNodes = 0) public class ReindexFailoverIT extends ReindexTestCase { + @Override + protected Settings nodeSettings(int nodeOrdinal) { + return Settings.builder().put(super.nodeSettings(nodeOrdinal)) + .put(SearchService.KEEPALIVE_INTERVAL_SETTING.getKey(), TimeValue.timeValueSeconds(1)).build(); + } + @Override protected Collection> nodePlugins() { final HashSet> classes = new HashSet<>(super.nodePlugins()); @@ -103,7 +110,7 @@ public void testReindexFailover() throws Throwable { String nodeName = nodeIdToName.get(nodeId); logger.info("--> restarting node: " + nodeName); - ensureGreen(ReindexTask.REINDEX_INDEX); + ensureGreen(ReindexIndexClient.REINDEX_INDEX); internalCluster().restartNode(nodeName, new InternalTestCluster.RestartCallback()); ensureYellow("dest"); diff --git a/modules/reindex/src/test/resources/rest-api-spec/test/reindex/10_basic.yml b/modules/reindex/src/test/resources/rest-api-spec/test/reindex/10_basic.yml index 9ef6c1a90c400..f4a3e1b63e93d 100644 --- a/modules/reindex/src/test/resources/rest-api-spec/test/reindex/10_basic.yml +++ b/modules/reindex/src/test/resources/rest-api-spec/test/reindex/10_basic.yml @@ -158,7 +158,8 @@ - match: {failures.0.index: dest} - match: {failures.0.id: "1"} - match: {failures.0.status: 409} - - match: {failures.0.cause.type: version_conflict_engine_exception} + # TODO: Changed the message because of exception serialization + - match: {failures.0.cause.type: exception} # Use a regex so we don't mind if the version isn't always 1. Sometimes it comes out 2. - match: {failures.0.cause.reason: "/\\[1\\]:.version.conflict,.document.already.exists.\\(current.version.\\[\\d+\\]\\)/"} - match: {failures.0.cause.shard: /\d+/} diff --git a/modules/reindex/src/test/resources/rest-api-spec/test/reindex/35_search_failures.yml b/modules/reindex/src/test/resources/rest-api-spec/test/reindex/35_search_failures.yml index 5fd888f77a119..5bee4d9844d30 100644 --- a/modules/reindex/src/test/resources/rest-api-spec/test/reindex/35_search_failures.yml +++ b/modules/reindex/src/test/resources/rest-api-spec/test/reindex/35_search_failures.yml @@ -35,8 +35,12 @@ - match: {failures.0.shard: 0} - match: {failures.0.index: source} - is_true: failures.0.node - - match: {failures.0.reason.type: script_exception} - - match: {failures.0.reason.reason: runtime error} - - match: {failures.0.reason.caused_by.type: illegal_argument_exception} - - match: {failures.0.reason.caused_by.reason: Cats!} + # TODO: Changed the message because of exception serialization + - match: {failures.0.reason.type: exception} + # TODO: Changed the message because of exception serialization + - match: {failures.0.reason.reason: "Elasticsearch exception [type=script_exception, reason=runtime error]"} + # TODO: Changed the message because of exception serialization + - match: {failures.0.reason.caused_by.type: exception} + # TODO: Changed the message because of exception serialization + - match: {failures.0.reason.caused_by.reason: "Elasticsearch exception [type=illegal_argument_exception, reason=Cats!]"} - gte: { took: 0 } diff --git a/modules/reindex/src/test/resources/rest-api-spec/test/reindex/85_scripting.yml b/modules/reindex/src/test/resources/rest-api-spec/test/reindex/85_scripting.yml index ee920a669a504..05481d02d16c6 100644 --- a/modules/reindex/src/test/resources/rest-api-spec/test/reindex/85_scripting.yml +++ b/modules/reindex/src/test/resources/rest-api-spec/test/reindex/85_scripting.yml @@ -439,4 +439,5 @@ script: lang: painless source: syntax errors are fun! - - match: {error.reason: 'compile error'} + # TODO: Changed the message because of exception serialization + - match: {error.reason: 'Elasticsearch exception [type=script_exception, reason=compile error]'} diff --git a/server/src/main/java/org/elasticsearch/index/reindex/ReindexIndexClient.java b/server/src/main/java/org/elasticsearch/index/reindex/ReindexIndexClient.java new file mode 100644 index 0000000000000..0932399f0638d --- /dev/null +++ b/server/src/main/java/org/elasticsearch/index/reindex/ReindexIndexClient.java @@ -0,0 +1,163 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.index.reindex; + +import org.elasticsearch.ElasticsearchException; +import org.elasticsearch.ExceptionsHelper; +import org.elasticsearch.ResourceAlreadyExistsException; +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.DocWriteRequest; +import org.elasticsearch.action.admin.indices.create.CreateIndexRequest; +import org.elasticsearch.action.admin.indices.create.CreateIndexResponse; +import org.elasticsearch.action.get.GetRequest; +import org.elasticsearch.action.get.GetResponse; +import org.elasticsearch.action.index.IndexRequest; +import org.elasticsearch.action.index.IndexResponse; +import org.elasticsearch.client.Client; +import org.elasticsearch.client.OriginSettingClient; +import org.elasticsearch.cluster.metadata.IndexMetaData; +import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.bytes.BytesReference; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.xcontent.LoggingDeprecationHandler; +import org.elasticsearch.common.xcontent.NamedXContentRegistry; +import org.elasticsearch.common.xcontent.ToXContent; +import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.common.xcontent.XContentFactory; +import org.elasticsearch.common.xcontent.XContentHelper; +import org.elasticsearch.common.xcontent.XContentParser; +import org.elasticsearch.common.xcontent.XContentType; + +import java.io.IOException; + +public class ReindexIndexClient { + + // TODO: Eventually this should be an alias for index versioning + public static final String REINDEX_INDEX = ".reindex"; + public static final String REINDEX_ORIGIN = "reindex"; + + private final Client client; + private final ClusterService clusterService; + private final NamedXContentRegistry xContentRegistry; + + public ReindexIndexClient(Client client, ClusterService clusterService, NamedXContentRegistry xContentRegistry) { + this.client = new OriginSettingClient(client, REINDEX_ORIGIN); + this.clusterService = clusterService; + this.xContentRegistry = xContentRegistry; + } + + public void getReindexTaskDoc(String taskId, ActionListener listener) { + GetRequest getRequest = new GetRequest(REINDEX_INDEX).id(taskId); + client.get(getRequest, new ActionListener<>() { + @Override + public void onResponse(GetResponse response) { + BytesReference source = response.getSourceAsBytesRef(); + try (XContentParser parser = XContentHelper.createParser(xContentRegistry, LoggingDeprecationHandler.INSTANCE, source, + XContentType.JSON)) { + ReindexTaskIndexState taskState = ReindexTaskIndexState.fromXContent(parser); + listener.onResponse(taskState); + } catch (IOException e) { + listener.onFailure(e); + } + } + + @Override + public void onFailure(Exception e) { + listener.onFailure(e); + } + }); + } + + public void createReindexTaskDoc(String taskId, ReindexTaskIndexState reindexState, ActionListener listener) { + boolean reindexIndexExists = clusterService.state().routingTable().hasIndex(ReindexIndexClient.REINDEX_INDEX); + createReindexTaskDoc(taskId, reindexState, reindexIndexExists, listener); + } + + private void createReindexTaskDoc(String taskId, ReindexTaskIndexState reindexState, boolean indexExists, + ActionListener listener) { + if (indexExists) { + index(taskId, reindexState, DocWriteRequest.OpType.CREATE, listener); + } else { + CreateIndexRequest createIndexRequest = new CreateIndexRequest(); + createIndexRequest.settings(reindexIndexSettings()); + createIndexRequest.index(REINDEX_INDEX); + createIndexRequest.cause("auto(reindex api)"); + createIndexRequest.mapping("_doc", "{\"dynamic\": false}", XContentType.JSON); + + client.admin().indices().create(createIndexRequest, new ActionListener<>() { + @Override + public void onResponse(CreateIndexResponse result) { + createReindexTaskDoc(taskId, reindexState, true, listener); + } + + @Override + public void onFailure(Exception e) { + if (ExceptionsHelper.unwrapCause(e) instanceof ResourceAlreadyExistsException) { + try { + createReindexTaskDoc(taskId, reindexState, true, listener); + } catch (Exception inner) { + inner.addSuppressed(e); + listener.onFailure(inner); + } + } else { + listener.onFailure(e); + } + } + }); + } + } + + // TODO: Potentially add compare-and-set semantics to ensure that another node has not been assigned + // this task + public void updateReindexTaskDoc(String taskId, ReindexTaskIndexState reindexState, ActionListener listener) { + index(taskId, reindexState, DocWriteRequest.OpType.INDEX, listener); + } + + private void index(String taskId, ReindexTaskIndexState reindexState, DocWriteRequest.OpType opType, ActionListener listener) { + IndexRequest indexRequest = new IndexRequest(REINDEX_INDEX).id(taskId).opType(opType); + try (XContentBuilder builder = XContentFactory.contentBuilder(XContentType.JSON)) { + reindexState.toXContent(builder, ToXContent.EMPTY_PARAMS); + indexRequest.source(builder); + } catch (IOException e) { + listener.onFailure(new ElasticsearchException("Couldn't serialize ReindexTaskIndexState into XContent", e)); + return; + } + client.index(indexRequest, new ActionListener<>() { + @Override + public void onResponse(IndexResponse indexResponse) { + listener.onResponse(null); + } + + @Override + public void onFailure(Exception e) { + listener.onFailure(e); + } + }); + } + + private static Settings reindexIndexSettings() { + // TODO: Copied from task index + return Settings.builder() + .put(IndexMetaData.INDEX_NUMBER_OF_SHARDS_SETTING.getKey(), 1) + .put(IndexMetaData.INDEX_AUTO_EXPAND_REPLICAS_SETTING.getKey(), "0-1") + .put(IndexMetaData.SETTING_PRIORITY, Integer.MAX_VALUE) + .build(); + } +} diff --git a/server/src/main/java/org/elasticsearch/index/reindex/ReindexTaskIndexState.java b/server/src/main/java/org/elasticsearch/index/reindex/ReindexTaskIndexState.java index e9def71be1f73..671f1e25af5cc 100644 --- a/server/src/main/java/org/elasticsearch/index/reindex/ReindexTaskIndexState.java +++ b/server/src/main/java/org/elasticsearch/index/reindex/ReindexTaskIndexState.java @@ -19,11 +19,14 @@ package org.elasticsearch.index.reindex; +import org.elasticsearch.ElasticsearchException; +import org.elasticsearch.common.Nullable; import org.elasticsearch.common.ParseField; import org.elasticsearch.common.xcontent.ConstructingObjectParser; import org.elasticsearch.common.xcontent.ToXContentObject; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentParser; +import org.elasticsearch.rest.RestStatus; import java.io.IOException; @@ -31,19 +34,45 @@ public class ReindexTaskIndexState implements ToXContentObject { public static final String REINDEX_ORIGIN = "reindex"; public static final ConstructingObjectParser PARSER = - new ConstructingObjectParser<>("reindex/index_state", a -> new ReindexTaskIndexState((ReindexRequest) a[0])); + new ConstructingObjectParser<>("reindex/index_state", a -> new ReindexTaskIndexState((ReindexRequest) a[0], + (BulkByScrollResponse) a[1], (ElasticsearchException) a[2], (Integer) a[3])); private static final String REINDEX_REQUEST = "request"; + private static final String REINDEX_RESPONSE = "response"; + private static final String REINDEX_EXCEPTION = "exception"; + private static final String FAILURE_REST_STATUS = "failure_rest_status"; static { PARSER.declareObject(ConstructingObjectParser.constructorArg(), (p, c) -> ReindexRequest.fromXContentWithParams(p), new ParseField(REINDEX_REQUEST)); + PARSER.declareObject(ConstructingObjectParser.optionalConstructorArg(), (p, c) -> BulkByScrollResponse.fromXContent(p), + new ParseField(REINDEX_RESPONSE)); + PARSER.declareObject(ConstructingObjectParser.optionalConstructorArg(), (p, c) -> ElasticsearchException.fromXContent(p), + new ParseField(REINDEX_EXCEPTION)); + PARSER.declareInt(ConstructingObjectParser.optionalConstructorArg(), new ParseField(FAILURE_REST_STATUS)); } private final ReindexRequest reindexRequest; + private final BulkByScrollResponse reindexResponse; + private final Exception exception; + private final RestStatus failureStatusCode; public ReindexTaskIndexState(ReindexRequest reindexRequest) { + this(reindexRequest, null, null, (RestStatus) null); + } + + public ReindexTaskIndexState(ReindexRequest reindexRequest, @Nullable BulkByScrollResponse reindexResponse, + @Nullable ElasticsearchException exception, @Nullable Integer failureStatusCode) { + this(reindexRequest, reindexResponse, exception, failureStatusCode == null ? null : RestStatus.fromCode(failureStatusCode)); + } + + public ReindexTaskIndexState(ReindexRequest reindexRequest, @Nullable BulkByScrollResponse reindexResponse, + @Nullable ElasticsearchException exception, @Nullable RestStatus failureStatusCode) { + assert (reindexResponse == null) || (exception == null) : "Either response or exception must be null"; this.reindexRequest = reindexRequest; + this.reindexResponse = reindexResponse; + this.exception = exception; + this.failureStatusCode = failureStatusCode; } @Override @@ -51,6 +80,19 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws builder.startObject(); builder.field(REINDEX_REQUEST); reindexRequest.toXContent(builder, params, true); + if (reindexResponse != null) { + builder.field(REINDEX_RESPONSE); + builder.startObject(); + reindexResponse.toXContent(builder, params); + builder.endObject(); + } + if (exception != null) { + builder.field(REINDEX_EXCEPTION); + builder.startObject(); + ElasticsearchException.generateThrowableXContent(builder, params, exception); + builder.endObject(); + builder.field(FAILURE_REST_STATUS, failureStatusCode.getStatus()); + } return builder.endObject(); } @@ -61,4 +103,16 @@ public static ReindexTaskIndexState fromXContent(XContentParser parser) { public ReindexRequest getReindexRequest() { return reindexRequest; } + + public BulkByScrollResponse getReindexResponse() { + return reindexResponse; + } + + public Exception getException() { + return exception; + } + + public RestStatus getFailureStatusCode() { + return failureStatusCode; + } }