Skip to content

After reindexing a data stream index, delete the source index #120237

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,14 @@
import org.apache.logging.log4j.Logger;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest;
import org.elasticsearch.action.admin.indices.delete.TransportDeleteIndexAction;
import org.elasticsearch.action.admin.indices.rollover.RolloverAction;
import org.elasticsearch.action.admin.indices.rollover.RolloverRequest;
import org.elasticsearch.action.datastreams.GetDataStreamAction;
import org.elasticsearch.action.datastreams.ModifyDataStreamsAction;
import org.elasticsearch.action.support.CountDownActionListener;
import org.elasticsearch.action.support.SubscribableListener;
import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.client.internal.Client;
import org.elasticsearch.cluster.metadata.DataStream;
Expand Down Expand Up @@ -211,26 +214,29 @@ private void maybeProcessNextIndex(
reindexDataStreamTask.incrementInProgressIndicesCount(index.getName());
ReindexDataStreamIndexAction.Request reindexDataStreamIndexRequest = new ReindexDataStreamIndexAction.Request(index.getName());
reindexDataStreamIndexRequest.setParentTask(parentTaskId);
reindexClient.execute(ReindexDataStreamIndexAction.INSTANCE, reindexDataStreamIndexRequest, ActionListener.wrap(response1 -> {
updateDataStream(sourceDataStream, index.getName(), response1.getDestIndex(), ActionListener.wrap(unused -> {

SubscribableListener.<ReindexDataStreamIndexAction.Response>newForked(
l -> reindexClient.execute(ReindexDataStreamIndexAction.INSTANCE, reindexDataStreamIndexRequest, l)
)
.<AcknowledgedResponse>andThen(
(l, result) -> updateDataStream(sourceDataStream, index.getName(), result.getDestIndex(), l, reindexClient, parentTaskId)
)
.<AcknowledgedResponse>andThen(l -> deleteIndex(index.getName(), reindexClient, parentTaskId, l))
.addListener(ActionListener.wrap(unused -> {
reindexDataStreamTask.reindexSucceeded(index.getName());
listener.onResponse(null);
maybeProcessNextIndex(indicesRemaining, reindexDataStreamTask, reindexClient, sourceDataStream, listener, parentTaskId);
}, exception -> {
reindexDataStreamTask.reindexFailed(index.getName(), exception);
}, e -> {
reindexDataStreamTask.reindexFailed(index.getName(), e);
listener.onResponse(null);
}), reindexClient, parentTaskId);
}, exception -> {
reindexDataStreamTask.reindexFailed(index.getName(), exception);
listener.onResponse(null);
}));
}));
}

private void updateDataStream(
String dataStream,
String oldIndex,
String newIndex,
ActionListener<Void> listener,
ActionListener<AcknowledgedResponse> listener,
ExecuteWithHeadersClient reindexClient,
TaskId parentTaskId
) {
Expand All @@ -240,17 +246,18 @@ private void updateDataStream(
List.of(DataStreamAction.removeBackingIndex(dataStream, oldIndex), DataStreamAction.addBackingIndex(dataStream, newIndex))
);
modifyDataStreamRequest.setParentTask(parentTaskId);
reindexClient.execute(ModifyDataStreamsAction.INSTANCE, modifyDataStreamRequest, new ActionListener<>() {
@Override
public void onResponse(AcknowledgedResponse response) {
listener.onResponse(null);
}
reindexClient.execute(ModifyDataStreamsAction.INSTANCE, modifyDataStreamRequest, listener);
}

@Override
public void onFailure(Exception e) {
listener.onFailure(e);
}
});
private void deleteIndex(
String indexName,
ExecuteWithHeadersClient reindexClient,
TaskId parentTaskId,
ActionListener<AcknowledgedResponse> listener
) {
DeleteIndexRequest deleteIndexRequest = new DeleteIndexRequest(indexName);
deleteIndexRequest.setParentTask(parentTaskId);
reindexClient.execute(TransportDeleteIndexAction.TYPE, deleteIndexRequest, listener);
}

private void completeSuccessfulPersistentTask(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,9 @@
import java.time.Instant;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;

import static org.elasticsearch.upgrades.IndexingIT.assertCount;
import static org.hamcrest.Matchers.equalTo;
Expand Down Expand Up @@ -256,6 +258,7 @@ private static void createAndRolloverDataStream(String dataStreamName, int numRo
}

private void upgradeDataStream(String dataStreamName, int numRolloversOnOldCluster) throws Exception {
Set<String> indicesNeedingUpgrade = getDataStreamIndices(dataStreamName);
final int explicitRolloverOnNewClusterCount = randomIntBetween(0, 2);
for (int i = 0; i < explicitRolloverOnNewClusterCount; i++) {
rollover(dataStreamName);
Expand Down Expand Up @@ -292,10 +295,9 @@ private void upgradeDataStream(String dataStreamName, int numRolloversOnOldClust
} else {
// The number of rollovers that will have happened when we call reindex:
final int rolloversPerformedByReindex = explicitRolloverOnNewClusterCount == 0 ? 1 : 0;
assertThat(
statusResponseMap.get("total_indices_in_data_stream"),
equalTo(originalWriteIndex + numRolloversOnOldCluster + explicitRolloverOnNewClusterCount + rolloversPerformedByReindex)
);
final int expectedTotalIndicesInDataStream = originalWriteIndex + numRolloversOnOldCluster
+ explicitRolloverOnNewClusterCount + rolloversPerformedByReindex;
assertThat(statusResponseMap.get("total_indices_in_data_stream"), equalTo(expectedTotalIndicesInDataStream));
/*
* total_indices_requiring_upgrade is made up of: (the original write index) + numRolloversOnOldCluster. The number of
* rollovers on the upgraded cluster is irrelevant since those will not be reindexed.
Expand All @@ -305,13 +307,28 @@ private void upgradeDataStream(String dataStreamName, int numRolloversOnOldClust
equalTo(originalWriteIndex + numRolloversOnOldCluster)
);
assertThat(statusResponseMap.get("successes"), equalTo(numRolloversOnOldCluster + 1));
// We expect all the original indices to have been deleted
for (String oldIndex : indicesNeedingUpgrade) {
assertThat(indexExists(oldIndex), equalTo(false));
}
assertThat(getDataStreamIndices(dataStreamName).size(), equalTo(expectedTotalIndicesInDataStream));
}
}, 60, TimeUnit.SECONDS);
Request cancelRequest = new Request("POST", "_migration/reindex/" + dataStreamName + "/_cancel");
Response cancelResponse = client().performRequest(cancelRequest);
assertOK(cancelResponse);
}

@SuppressWarnings("unchecked")
private Set<String> getDataStreamIndices(String dataStreamName) throws IOException {
Response response = client().performRequest(new Request("GET", "_data_stream/" + dataStreamName));
Map<String, Object> responseMap = XContentHelper.convertToMap(JsonXContent.jsonXContent, response.getEntity().getContent(), false);
List<Map<String, Object>> dataStreams = (List<Map<String, Object>>) responseMap.get("data_streams");
Map<String, Object> dataStream = dataStreams.get(0);
List<Map<String, Object>> indices = (List<Map<String, Object>>) dataStream.get("indices");
return indices.stream().map(index -> index.get("index_name").toString()).collect(Collectors.toSet());
}

/*
* Similar to isOriginalClusterCurrent, but returns true if the major versions of the clusters are the same. So true
* for 8.6 and 8.17, but false for 7.17 and 8.18.
Expand Down