Skip to content

Commit d334e3b

Browse files
Improve Partial Snapshot Rollover Behavior (#69364)
Using new reconciliation functionality to not needlessly drop rolling over data streams from the final snapshot. closes #68536
1 parent bb3ea99 commit d334e3b

File tree

4 files changed

+73
-29
lines changed

4 files changed

+73
-29
lines changed

server/src/main/java/org/elasticsearch/cluster/metadata/DataStream.java

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424

2525
import java.io.IOException;
2626
import java.util.ArrayList;
27+
import java.util.Collection;
2728
import java.util.Collections;
2829
import java.util.HashMap;
2930
import java.util.List;
@@ -177,15 +178,19 @@ public DataStream promoteDataStream() {
177178
* stream definitions that do not reference backing indices not contained in the snapshot.
178179
*
179180
* @param indicesInSnapshot List of indices in the snapshot
180-
* @return Reconciled {@link DataStream} instance
181+
* @return Reconciled {@link DataStream} instance or {@code null} if no reconciled version of this data stream could be built from the
182+
* given indices
181183
*/
182-
public DataStream snapshot(List<String> indicesInSnapshot) {
184+
@Nullable
185+
public DataStream snapshot(Collection<String> indicesInSnapshot) {
183186
// do not include indices not available in the snapshot
184187
List<Index> reconciledIndices = new ArrayList<>(this.indices);
185-
reconciledIndices.removeIf(x -> indicesInSnapshot.contains(x.getName()) == false);
188+
if (reconciledIndices.removeIf(x -> indicesInSnapshot.contains(x.getName()) == false) == false) {
189+
return this;
190+
}
186191

187192
if (reconciledIndices.size() == 0) {
188-
throw new IllegalArgumentException("cannot reconcile data stream without at least one backing index");
193+
return null;
189194
}
190195

191196
return new DataStream(

server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -792,8 +792,9 @@ private static Metadata metadataForSnapshot(SnapshotsInProgress.Entry snapshot,
792792
break;
793793
}
794794
}
795-
if (missingIndex == false) {
796-
dataStreams.put(dataStreamName, dataStream);
795+
final DataStream reconciled = missingIndex ? dataStream.snapshot(indicesInSnapshot) : dataStream;
796+
if (reconciled != null) {
797+
dataStreams.put(dataStreamName, reconciled);
797798
}
798799
}
799800
}

server/src/test/java/org/elasticsearch/cluster/metadata/DataStreamTests.java

Lines changed: 2 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,6 @@
2424
import java.util.stream.Collectors;
2525

2626
import static org.elasticsearch.cluster.DataStreamTestHelper.createTimestampField;
27-
import static org.hamcrest.Matchers.containsString;
2827
import static org.hamcrest.Matchers.equalTo;
2928
import static org.hamcrest.Matchers.everyItem;
3029
import static org.hamcrest.Matchers.hasItems;
@@ -221,12 +220,7 @@ public void testSnapshotWithAllBackingIndicesRemoved() {
221220
preSnapshotDataStream.isReplicated()
222221
);
223222

224-
IllegalArgumentException e = expectThrows(
225-
IllegalArgumentException.class,
226-
() -> postSnapshotDataStream.snapshot(
227-
preSnapshotDataStream.getIndices().stream().map(Index::getName).collect(Collectors.toList())
228-
)
229-
);
230-
assertThat(e.getMessage(), containsString("cannot reconcile data stream without at least one backing index"));
223+
assertNull(postSnapshotDataStream.snapshot(
224+
preSnapshotDataStream.getIndices().stream().map(Index::getName).collect(Collectors.toList())));
231225
}
232226
}

x-pack/plugin/data-streams/src/internalClusterTest/java/org/elasticsearch/datastreams/DataStreamsSnapshotsIT.java

Lines changed: 59 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,9 @@
5151
import static org.hamcrest.Matchers.containsString;
5252
import static org.hamcrest.Matchers.empty;
5353
import static org.hamcrest.Matchers.equalTo;
54+
import static org.hamcrest.Matchers.hasItems;
5455
import static org.hamcrest.Matchers.is;
56+
import static org.hamcrest.Matchers.not;
5557
import static org.hamcrest.Matchers.nullValue;
5658

5759
public class DataStreamsSnapshotsIT extends AbstractSnapshotIntegTestCase {
@@ -541,22 +543,64 @@ public void testSnapshotDSDuringRollover() throws Exception {
541543
unblockAllDataNodes(repoName);
542544
final SnapshotInfo snapshotInfo = assertSuccessful(snapshotFuture);
543545

544-
if (snapshotInfo.dataStreams().contains("ds")) {
545-
assertAcked(client().execute(DeleteDataStreamAction.INSTANCE, new DeleteDataStreamAction.Request(new String[] { "ds" })).get());
546+
assertThat(snapshotInfo.dataStreams(), hasItems("ds"));
547+
assertAcked(client().execute(DeleteDataStreamAction.INSTANCE, new DeleteDataStreamAction.Request(new String[] { "ds" })).get());
546548

547-
RestoreInfo restoreSnapshotResponse = client().admin()
548-
.cluster()
549-
.prepareRestoreSnapshot(repoName, snapshotName)
550-
.setWaitForCompletion(true)
551-
.setIndices("ds")
552-
.get()
553-
.getRestoreInfo();
549+
RestoreInfo restoreSnapshotResponse = client().admin()
550+
.cluster()
551+
.prepareRestoreSnapshot(repoName, snapshotName)
552+
.setWaitForCompletion(true)
553+
.setIndices("ds")
554+
.get()
555+
.getRestoreInfo();
554556

555-
assertEquals(restoreSnapshotResponse.successfulShards(), restoreSnapshotResponse.totalShards());
556-
assertEquals(restoreSnapshotResponse.failedShards(), 0);
557-
assertFalse(partial);
558-
} else {
559-
assertTrue(partial);
560-
}
557+
assertEquals(restoreSnapshotResponse.successfulShards(), restoreSnapshotResponse.totalShards());
558+
assertEquals(restoreSnapshotResponse.failedShards(), 0);
559+
}
560+
561+
public void testSnapshotDSDuringRolloverAndDeleteOldIndex() throws Exception {
562+
// repository consistency check requires at least one snapshot per registered repository
563+
createFullSnapshot(REPO, "snap-so-repo-checks-pass");
564+
final String repoName = "mock-repo";
565+
createRepository(repoName, "mock");
566+
blockAllDataNodes(repoName);
567+
final String snapshotName = "ds-snap";
568+
final ActionFuture<CreateSnapshotResponse> snapshotFuture = client().admin()
569+
.cluster()
570+
.prepareCreateSnapshot(repoName, snapshotName)
571+
.setWaitForCompletion(true)
572+
.setPartial(true)
573+
.setIncludeGlobalState(randomBoolean())
574+
.execute();
575+
waitForBlockOnAnyDataNode(repoName);
576+
awaitNumberOfSnapshotsInProgress(1);
577+
final RolloverResponse rolloverResponse = client().admin().indices().rolloverIndex(new RolloverRequest("ds", null)).get();
578+
assertTrue(rolloverResponse.isRolledOver());
579+
580+
logger.info("--> deleting former write index");
581+
assertAcked(client().admin().indices().prepareDelete(rolloverResponse.getOldIndex()));
582+
583+
unblockAllDataNodes(repoName);
584+
final SnapshotInfo snapshotInfo = assertSuccessful(snapshotFuture);
585+
586+
assertThat(
587+
"snapshot should not contain 'ds' since none of its indices existed both at the start and at the end of the snapshot",
588+
snapshotInfo.dataStreams(),
589+
not(hasItems("ds"))
590+
);
591+
assertAcked(
592+
client().execute(DeleteDataStreamAction.INSTANCE, new DeleteDataStreamAction.Request(new String[] { "other-ds" })).get()
593+
);
594+
595+
RestoreInfo restoreSnapshotResponse = client().admin()
596+
.cluster()
597+
.prepareRestoreSnapshot(repoName, snapshotName)
598+
.setWaitForCompletion(true)
599+
.setIndices("other-ds")
600+
.get()
601+
.getRestoreInfo();
602+
603+
assertEquals(restoreSnapshotResponse.successfulShards(), restoreSnapshotResponse.totalShards());
604+
assertEquals(restoreSnapshotResponse.failedShards(), 0);
561605
}
562606
}

0 commit comments

Comments
 (0)