Skip to content

Commit e0161fe

Browse files
authored
After reindexing a data stream index, delete the source index (#120237) (#120240)
1 parent 20ab248 commit e0161fe

File tree

2 files changed

+48
-24
lines changed

2 files changed

+48
-24
lines changed

x-pack/plugin/migrate/src/main/java/org/elasticsearch/xpack/migrate/task/ReindexDataStreamPersistentTaskExecutor.java

Lines changed: 27 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -11,11 +11,14 @@
1111
import org.apache.logging.log4j.Logger;
1212
import org.elasticsearch.ElasticsearchException;
1313
import org.elasticsearch.action.ActionListener;
14+
import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest;
15+
import org.elasticsearch.action.admin.indices.delete.TransportDeleteIndexAction;
1416
import org.elasticsearch.action.admin.indices.rollover.RolloverAction;
1517
import org.elasticsearch.action.admin.indices.rollover.RolloverRequest;
1618
import org.elasticsearch.action.datastreams.GetDataStreamAction;
1719
import org.elasticsearch.action.datastreams.ModifyDataStreamsAction;
1820
import org.elasticsearch.action.support.CountDownActionListener;
21+
import org.elasticsearch.action.support.SubscribableListener;
1922
import org.elasticsearch.action.support.master.AcknowledgedResponse;
2023
import org.elasticsearch.client.internal.Client;
2124
import org.elasticsearch.cluster.metadata.DataStream;
@@ -210,26 +213,29 @@ private void maybeProcessNextIndex(
210213
reindexDataStreamTask.incrementInProgressIndicesCount(index.getName());
211214
ReindexDataStreamIndexAction.Request reindexDataStreamIndexRequest = new ReindexDataStreamIndexAction.Request(index.getName());
212215
reindexDataStreamIndexRequest.setParentTask(parentTaskId);
213-
reindexClient.execute(ReindexDataStreamIndexAction.INSTANCE, reindexDataStreamIndexRequest, ActionListener.wrap(response1 -> {
214-
updateDataStream(sourceDataStream, index.getName(), response1.getDestIndex(), ActionListener.wrap(unused -> {
216+
217+
SubscribableListener.<ReindexDataStreamIndexAction.Response>newForked(
218+
l -> reindexClient.execute(ReindexDataStreamIndexAction.INSTANCE, reindexDataStreamIndexRequest, l)
219+
)
220+
.<AcknowledgedResponse>andThen(
221+
(l, result) -> updateDataStream(sourceDataStream, index.getName(), result.getDestIndex(), l, reindexClient, parentTaskId)
222+
)
223+
.<AcknowledgedResponse>andThen(l -> deleteIndex(index.getName(), reindexClient, parentTaskId, l))
224+
.addListener(ActionListener.wrap(unused -> {
215225
reindexDataStreamTask.reindexSucceeded(index.getName());
216226
listener.onResponse(null);
217227
maybeProcessNextIndex(indicesRemaining, reindexDataStreamTask, reindexClient, sourceDataStream, listener, parentTaskId);
218-
}, exception -> {
219-
reindexDataStreamTask.reindexFailed(index.getName(), exception);
228+
}, e -> {
229+
reindexDataStreamTask.reindexFailed(index.getName(), e);
220230
listener.onResponse(null);
221-
}), reindexClient, parentTaskId);
222-
}, exception -> {
223-
reindexDataStreamTask.reindexFailed(index.getName(), exception);
224-
listener.onResponse(null);
225-
}));
231+
}));
226232
}
227233

228234
private void updateDataStream(
229235
String dataStream,
230236
String oldIndex,
231237
String newIndex,
232-
ActionListener<Void> listener,
238+
ActionListener<AcknowledgedResponse> listener,
233239
ExecuteWithHeadersClient reindexClient,
234240
TaskId parentTaskId
235241
) {
@@ -239,17 +245,18 @@ private void updateDataStream(
239245
List.of(DataStreamAction.removeBackingIndex(dataStream, oldIndex), DataStreamAction.addBackingIndex(dataStream, newIndex))
240246
);
241247
modifyDataStreamRequest.setParentTask(parentTaskId);
242-
reindexClient.execute(ModifyDataStreamsAction.INSTANCE, modifyDataStreamRequest, new ActionListener<>() {
243-
@Override
244-
public void onResponse(AcknowledgedResponse response) {
245-
listener.onResponse(null);
246-
}
248+
reindexClient.execute(ModifyDataStreamsAction.INSTANCE, modifyDataStreamRequest, listener);
249+
}
247250

248-
@Override
249-
public void onFailure(Exception e) {
250-
listener.onFailure(e);
251-
}
252-
});
251+
private void deleteIndex(
252+
String indexName,
253+
ExecuteWithHeadersClient reindexClient,
254+
TaskId parentTaskId,
255+
ActionListener<AcknowledgedResponse> listener
256+
) {
257+
DeleteIndexRequest deleteIndexRequest = new DeleteIndexRequest(indexName);
258+
deleteIndexRequest.setParentTask(parentTaskId);
259+
reindexClient.execute(TransportDeleteIndexAction.TYPE, deleteIndexRequest, listener);
253260
}
254261

255262
private void completeSuccessfulPersistentTask(

x-pack/qa/rolling-upgrade/src/test/java/org/elasticsearch/upgrades/DataStreamsUpgradeIT.java

Lines changed: 21 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,9 @@
2626
import java.time.Instant;
2727
import java.util.List;
2828
import java.util.Map;
29+
import java.util.Set;
2930
import java.util.concurrent.TimeUnit;
31+
import java.util.stream.Collectors;
3032

3133
import static org.elasticsearch.upgrades.IndexingIT.assertCount;
3234
import static org.hamcrest.Matchers.equalTo;
@@ -251,6 +253,7 @@ private static void createAndRolloverDataStream(String dataStreamName, int numRo
251253
}
252254

253255
private void upgradeDataStream(String dataStreamName, int numRolloversOnOldCluster) throws Exception {
256+
Set<String> indicesNeedingUpgrade = getDataStreamIndices(dataStreamName);
254257
final int explicitRolloverOnNewClusterCount = randomIntBetween(0, 2);
255258
for (int i = 0; i < explicitRolloverOnNewClusterCount; i++) {
256259
rollover(dataStreamName);
@@ -287,10 +290,9 @@ private void upgradeDataStream(String dataStreamName, int numRolloversOnOldClust
287290
} else {
288291
// The number of rollovers that will have happened when we call reindex:
289292
final int rolloversPerformedByReindex = explicitRolloverOnNewClusterCount == 0 ? 1 : 0;
290-
assertThat(
291-
statusResponseMap.get("total_indices_in_data_stream"),
292-
equalTo(originalWriteIndex + numRolloversOnOldCluster + explicitRolloverOnNewClusterCount + rolloversPerformedByReindex)
293-
);
293+
final int expectedTotalIndicesInDataStream = originalWriteIndex + numRolloversOnOldCluster
294+
+ explicitRolloverOnNewClusterCount + rolloversPerformedByReindex;
295+
assertThat(statusResponseMap.get("total_indices_in_data_stream"), equalTo(expectedTotalIndicesInDataStream));
294296
/*
295297
* total_indices_requiring_upgrade is made up of: (the original write index) + numRolloversOnOldCluster. The number of
296298
* rollovers on the upgraded cluster is irrelevant since those will not be reindexed.
@@ -300,13 +302,28 @@ private void upgradeDataStream(String dataStreamName, int numRolloversOnOldClust
300302
equalTo(originalWriteIndex + numRolloversOnOldCluster)
301303
);
302304
assertThat(statusResponseMap.get("successes"), equalTo(numRolloversOnOldCluster + 1));
305+
// We expect all the original indices to have been deleted
306+
for (String oldIndex : indicesNeedingUpgrade) {
307+
assertThat(indexExists(oldIndex), equalTo(false));
308+
}
309+
assertThat(getDataStreamIndices(dataStreamName).size(), equalTo(expectedTotalIndicesInDataStream));
303310
}
304311
}, 60, TimeUnit.SECONDS);
305312
Request cancelRequest = new Request("POST", "_migration/reindex/" + dataStreamName + "/_cancel");
306313
Response cancelResponse = client().performRequest(cancelRequest);
307314
assertOK(cancelResponse);
308315
}
309316

317+
@SuppressWarnings("unchecked")
318+
private Set<String> getDataStreamIndices(String dataStreamName) throws IOException {
319+
Response response = client().performRequest(new Request("GET", "_data_stream/" + dataStreamName));
320+
Map<String, Object> responseMap = XContentHelper.convertToMap(JsonXContent.jsonXContent, response.getEntity().getContent(), false);
321+
List<Map<String, Object>> dataStreams = (List<Map<String, Object>>) responseMap.get("data_streams");
322+
Map<String, Object> dataStream = dataStreams.get(0);
323+
List<Map<String, Object>> indices = (List<Map<String, Object>>) dataStream.get("indices");
324+
return indices.stream().map(index -> index.get("index_name").toString()).collect(Collectors.toSet());
325+
}
326+
310327
/*
311328
* Similar to isOriginalClusterCurrent, but returns true if the major versions of the clusters are the same. So true
312329
* for 8.6 and 8.17, but false for 7.17 and 8.18.

0 commit comments

Comments
 (0)