diff --git a/x-pack/plugin/migrate/src/main/java/org/elasticsearch/xpack/migrate/task/ReindexDataStreamPersistentTaskExecutor.java b/x-pack/plugin/migrate/src/main/java/org/elasticsearch/xpack/migrate/task/ReindexDataStreamPersistentTaskExecutor.java index 104f843cc7950..ab86d957c39db 100644 --- a/x-pack/plugin/migrate/src/main/java/org/elasticsearch/xpack/migrate/task/ReindexDataStreamPersistentTaskExecutor.java +++ b/x-pack/plugin/migrate/src/main/java/org/elasticsearch/xpack/migrate/task/ReindexDataStreamPersistentTaskExecutor.java @@ -11,11 +11,14 @@ import org.apache.logging.log4j.Logger; import org.elasticsearch.ElasticsearchException; import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest; +import org.elasticsearch.action.admin.indices.delete.TransportDeleteIndexAction; import org.elasticsearch.action.admin.indices.rollover.RolloverAction; import org.elasticsearch.action.admin.indices.rollover.RolloverRequest; import org.elasticsearch.action.datastreams.GetDataStreamAction; import org.elasticsearch.action.datastreams.ModifyDataStreamsAction; import org.elasticsearch.action.support.CountDownActionListener; +import org.elasticsearch.action.support.SubscribableListener; import org.elasticsearch.action.support.master.AcknowledgedResponse; import org.elasticsearch.client.internal.Client; import org.elasticsearch.cluster.metadata.DataStream; @@ -211,26 +214,29 @@ private void maybeProcessNextIndex( reindexDataStreamTask.incrementInProgressIndicesCount(index.getName()); ReindexDataStreamIndexAction.Request reindexDataStreamIndexRequest = new ReindexDataStreamIndexAction.Request(index.getName()); reindexDataStreamIndexRequest.setParentTask(parentTaskId); - reindexClient.execute(ReindexDataStreamIndexAction.INSTANCE, reindexDataStreamIndexRequest, ActionListener.wrap(response1 -> { - updateDataStream(sourceDataStream, index.getName(), response1.getDestIndex(), ActionListener.wrap(unused -> { + + SubscribableListener.newForked( + l -> reindexClient.execute(ReindexDataStreamIndexAction.INSTANCE, reindexDataStreamIndexRequest, l) + ) + .andThen( + (l, result) -> updateDataStream(sourceDataStream, index.getName(), result.getDestIndex(), l, reindexClient, parentTaskId) + ) + .andThen(l -> deleteIndex(index.getName(), reindexClient, parentTaskId, l)) + .addListener(ActionListener.wrap(unused -> { reindexDataStreamTask.reindexSucceeded(index.getName()); listener.onResponse(null); maybeProcessNextIndex(indicesRemaining, reindexDataStreamTask, reindexClient, sourceDataStream, listener, parentTaskId); - }, exception -> { - reindexDataStreamTask.reindexFailed(index.getName(), exception); + }, e -> { + reindexDataStreamTask.reindexFailed(index.getName(), e); listener.onResponse(null); - }), reindexClient, parentTaskId); - }, exception -> { - reindexDataStreamTask.reindexFailed(index.getName(), exception); - listener.onResponse(null); - })); + })); } private void updateDataStream( String dataStream, String oldIndex, String newIndex, - ActionListener listener, + ActionListener listener, ExecuteWithHeadersClient reindexClient, TaskId parentTaskId ) { @@ -240,17 +246,18 @@ private void updateDataStream( List.of(DataStreamAction.removeBackingIndex(dataStream, oldIndex), DataStreamAction.addBackingIndex(dataStream, newIndex)) ); modifyDataStreamRequest.setParentTask(parentTaskId); - reindexClient.execute(ModifyDataStreamsAction.INSTANCE, modifyDataStreamRequest, new ActionListener<>() { - @Override - public void onResponse(AcknowledgedResponse response) { - listener.onResponse(null); - } + reindexClient.execute(ModifyDataStreamsAction.INSTANCE, modifyDataStreamRequest, listener); + } - @Override - public void onFailure(Exception e) { - listener.onFailure(e); - } - }); + private void deleteIndex( + String indexName, + ExecuteWithHeadersClient reindexClient, + TaskId parentTaskId, + ActionListener listener + ) { + DeleteIndexRequest deleteIndexRequest = new DeleteIndexRequest(indexName); + deleteIndexRequest.setParentTask(parentTaskId); + reindexClient.execute(TransportDeleteIndexAction.TYPE, deleteIndexRequest, listener); } private void completeSuccessfulPersistentTask( diff --git a/x-pack/qa/rolling-upgrade/src/test/java/org/elasticsearch/upgrades/DataStreamsUpgradeIT.java b/x-pack/qa/rolling-upgrade/src/test/java/org/elasticsearch/upgrades/DataStreamsUpgradeIT.java index 094ca9304a695..846c3c47a2714 100644 --- a/x-pack/qa/rolling-upgrade/src/test/java/org/elasticsearch/upgrades/DataStreamsUpgradeIT.java +++ b/x-pack/qa/rolling-upgrade/src/test/java/org/elasticsearch/upgrades/DataStreamsUpgradeIT.java @@ -26,7 +26,9 @@ import java.time.Instant; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; import static org.elasticsearch.upgrades.IndexingIT.assertCount; import static org.hamcrest.Matchers.equalTo; @@ -256,6 +258,7 @@ private static void createAndRolloverDataStream(String dataStreamName, int numRo } private void upgradeDataStream(String dataStreamName, int numRolloversOnOldCluster) throws Exception { + Set indicesNeedingUpgrade = getDataStreamIndices(dataStreamName); final int explicitRolloverOnNewClusterCount = randomIntBetween(0, 2); for (int i = 0; i < explicitRolloverOnNewClusterCount; i++) { rollover(dataStreamName); @@ -292,10 +295,9 @@ private void upgradeDataStream(String dataStreamName, int numRolloversOnOldClust } else { // The number of rollovers that will have happened when we call reindex: final int rolloversPerformedByReindex = explicitRolloverOnNewClusterCount == 0 ? 1 : 0; - assertThat( - statusResponseMap.get("total_indices_in_data_stream"), - equalTo(originalWriteIndex + numRolloversOnOldCluster + explicitRolloverOnNewClusterCount + rolloversPerformedByReindex) - ); + final int expectedTotalIndicesInDataStream = originalWriteIndex + numRolloversOnOldCluster + + explicitRolloverOnNewClusterCount + rolloversPerformedByReindex; + assertThat(statusResponseMap.get("total_indices_in_data_stream"), equalTo(expectedTotalIndicesInDataStream)); /* * total_indices_requiring_upgrade is made up of: (the original write index) + numRolloversOnOldCluster. The number of * rollovers on the upgraded cluster is irrelevant since those will not be reindexed. @@ -305,6 +307,11 @@ private void upgradeDataStream(String dataStreamName, int numRolloversOnOldClust equalTo(originalWriteIndex + numRolloversOnOldCluster) ); assertThat(statusResponseMap.get("successes"), equalTo(numRolloversOnOldCluster + 1)); + // We expect all the original indices to have been deleted + for (String oldIndex : indicesNeedingUpgrade) { + assertThat(indexExists(oldIndex), equalTo(false)); + } + assertThat(getDataStreamIndices(dataStreamName).size(), equalTo(expectedTotalIndicesInDataStream)); } }, 60, TimeUnit.SECONDS); Request cancelRequest = new Request("POST", "_migration/reindex/" + dataStreamName + "/_cancel"); @@ -312,6 +319,16 @@ private void upgradeDataStream(String dataStreamName, int numRolloversOnOldClust assertOK(cancelResponse); } + @SuppressWarnings("unchecked") + private Set getDataStreamIndices(String dataStreamName) throws IOException { + Response response = client().performRequest(new Request("GET", "_data_stream/" + dataStreamName)); + Map responseMap = XContentHelper.convertToMap(JsonXContent.jsonXContent, response.getEntity().getContent(), false); + List> dataStreams = (List>) responseMap.get("data_streams"); + Map dataStream = dataStreams.get(0); + List> indices = (List>) dataStream.get("indices"); + return indices.stream().map(index -> index.get("index_name").toString()).collect(Collectors.toSet()); + } + /* * Similar to isOriginalClusterCurrent, but returns true if the major versions of the clusters are the same. So true * for 8.6 and 8.17, but false for 7.17 and 8.18.