Skip to content

Commit 3828bc0

Browse files
Improve Partial Snapshot Rollover Behavior (elastic#69364)
Using new reconciliation functionality to not needlessly drop rolling over data streams from the final snapshot. closes elastic#68536
1 parent bd4a585 commit 3828bc0

File tree

4 files changed

+73
-29
lines changed

4 files changed

+73
-29
lines changed

server/src/main/java/org/elasticsearch/cluster/metadata/DataStream.java

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424

2525
import java.io.IOException;
2626
import java.util.ArrayList;
27+
import java.util.Collection;
2728
import java.util.Collections;
2829
import java.util.HashMap;
2930
import java.util.List;
@@ -177,15 +178,19 @@ public DataStream promoteDataStream() {
177178
* stream definitions that do not reference backing indices not contained in the snapshot.
178179
*
179180
* @param indicesInSnapshot List of indices in the snapshot
180-
* @return Reconciled {@link DataStream} instance
181+
* @return Reconciled {@link DataStream} instance or {@code null} if no reconciled version of this data stream could be built from the
182+
* given indices
181183
*/
182-
public DataStream snapshot(List<String> indicesInSnapshot) {
184+
@Nullable
185+
public DataStream snapshot(Collection<String> indicesInSnapshot) {
183186
// do not include indices not available in the snapshot
184187
List<Index> reconciledIndices = new ArrayList<>(this.indices);
185-
reconciledIndices.removeIf(x -> indicesInSnapshot.contains(x.getName()) == false);
188+
if (reconciledIndices.removeIf(x -> indicesInSnapshot.contains(x.getName()) == false) == false) {
189+
return this;
190+
}
186191

187192
if (reconciledIndices.size() == 0) {
188-
throw new IllegalArgumentException("cannot reconcile data stream without at least one backing index");
193+
return null;
189194
}
190195

191196
return new DataStream(

server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1122,8 +1122,9 @@ private static Metadata metadataForSnapshot(SnapshotsInProgress.Entry snapshot,
11221122
break;
11231123
}
11241124
}
1125-
if (missingIndex == false) {
1126-
dataStreams.put(dataStreamName, dataStream);
1125+
final DataStream reconciled = missingIndex ? dataStream.snapshot(indicesInSnapshot) : dataStream;
1126+
if (reconciled != null) {
1127+
dataStreams.put(dataStreamName, reconciled);
11271128
}
11281129
}
11291130
}

server/src/test/java/org/elasticsearch/cluster/metadata/DataStreamTests.java

Lines changed: 2 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,6 @@
2424
import java.util.stream.Collectors;
2525

2626
import static org.elasticsearch.cluster.DataStreamTestHelper.createTimestampField;
27-
import static org.hamcrest.Matchers.containsString;
2827
import static org.hamcrest.Matchers.equalTo;
2928
import static org.hamcrest.Matchers.everyItem;
3029
import static org.hamcrest.Matchers.hasItems;
@@ -217,12 +216,7 @@ public void testSnapshotWithAllBackingIndicesRemoved() {
217216
preSnapshotDataStream.isReplicated()
218217
);
219218

220-
IllegalArgumentException e = expectThrows(
221-
IllegalArgumentException.class,
222-
() -> postSnapshotDataStream.snapshot(
223-
preSnapshotDataStream.getIndices().stream().map(Index::getName).collect(Collectors.toList())
224-
)
225-
);
226-
assertThat(e.getMessage(), containsString("cannot reconcile data stream without at least one backing index"));
219+
assertNull(postSnapshotDataStream.snapshot(
220+
preSnapshotDataStream.getIndices().stream().map(Index::getName).collect(Collectors.toList())));
227221
}
228222
}

x-pack/plugin/data-streams/src/internalClusterTest/java/org/elasticsearch/datastreams/DataStreamsSnapshotsIT.java

Lines changed: 59 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,9 @@
5252
import static org.hamcrest.Matchers.containsString;
5353
import static org.hamcrest.Matchers.empty;
5454
import static org.hamcrest.Matchers.equalTo;
55+
import static org.hamcrest.Matchers.hasItems;
5556
import static org.hamcrest.Matchers.is;
57+
import static org.hamcrest.Matchers.not;
5658
import static org.hamcrest.Matchers.nullValue;
5759

5860
@ESIntegTestCase.ClusterScope(transportClientRatio = 0)
@@ -546,22 +548,64 @@ public void testSnapshotDSDuringRollover() throws Exception {
546548
unblockAllDataNodes(repoName);
547549
final SnapshotInfo snapshotInfo = assertSuccessful(snapshotFuture);
548550

549-
if (snapshotInfo.dataStreams().contains("ds")) {
550-
assertAcked(client().execute(DeleteDataStreamAction.INSTANCE, new DeleteDataStreamAction.Request(new String[] { "ds" })).get());
551+
assertThat(snapshotInfo.dataStreams(), hasItems("ds"));
552+
assertAcked(client().execute(DeleteDataStreamAction.INSTANCE, new DeleteDataStreamAction.Request(new String[] { "ds" })).get());
551553

552-
RestoreInfo restoreSnapshotResponse = client().admin()
553-
.cluster()
554-
.prepareRestoreSnapshot(repoName, snapshotName)
555-
.setWaitForCompletion(true)
556-
.setIndices("ds")
557-
.get()
558-
.getRestoreInfo();
554+
RestoreInfo restoreSnapshotResponse = client().admin()
555+
.cluster()
556+
.prepareRestoreSnapshot(repoName, snapshotName)
557+
.setWaitForCompletion(true)
558+
.setIndices("ds")
559+
.get()
560+
.getRestoreInfo();
559561

560-
assertEquals(restoreSnapshotResponse.successfulShards(), restoreSnapshotResponse.totalShards());
561-
assertEquals(restoreSnapshotResponse.failedShards(), 0);
562-
assertFalse(partial);
563-
} else {
564-
assertTrue(partial);
565-
}
562+
assertEquals(restoreSnapshotResponse.successfulShards(), restoreSnapshotResponse.totalShards());
563+
assertEquals(restoreSnapshotResponse.failedShards(), 0);
564+
}
565+
566+
public void testSnapshotDSDuringRolloverAndDeleteOldIndex() throws Exception {
567+
// repository consistency check requires at least one snapshot per registered repository
568+
createFullSnapshot(REPO, "snap-so-repo-checks-pass");
569+
final String repoName = "mock-repo";
570+
createRepository(repoName, "mock");
571+
blockAllDataNodes(repoName);
572+
final String snapshotName = "ds-snap";
573+
final ActionFuture<CreateSnapshotResponse> snapshotFuture = client().admin()
574+
.cluster()
575+
.prepareCreateSnapshot(repoName, snapshotName)
576+
.setWaitForCompletion(true)
577+
.setPartial(true)
578+
.setIncludeGlobalState(randomBoolean())
579+
.execute();
580+
waitForBlockOnAnyDataNode(repoName);
581+
awaitNumberOfSnapshotsInProgress(1);
582+
final RolloverResponse rolloverResponse = client().admin().indices().rolloverIndex(new RolloverRequest("ds", null)).get();
583+
assertTrue(rolloverResponse.isRolledOver());
584+
585+
logger.info("--> deleting former write index");
586+
assertAcked(client().admin().indices().prepareDelete(rolloverResponse.getOldIndex()));
587+
588+
unblockAllDataNodes(repoName);
589+
final SnapshotInfo snapshotInfo = assertSuccessful(snapshotFuture);
590+
591+
assertThat(
592+
"snapshot should not contain 'ds' since none of its indices existed both at the start and at the end of the snapshot",
593+
snapshotInfo.dataStreams(),
594+
not(hasItems("ds"))
595+
);
596+
assertAcked(
597+
client().execute(DeleteDataStreamAction.INSTANCE, new DeleteDataStreamAction.Request(new String[] { "other-ds" })).get()
598+
);
599+
600+
RestoreInfo restoreSnapshotResponse = client().admin()
601+
.cluster()
602+
.prepareRestoreSnapshot(repoName, snapshotName)
603+
.setWaitForCompletion(true)
604+
.setIndices("other-ds")
605+
.get()
606+
.getRestoreInfo();
607+
608+
assertEquals(restoreSnapshotResponse.successfulShards(), restoreSnapshotResponse.totalShards());
609+
assertEquals(restoreSnapshotResponse.failedShards(), 0);
566610
}
567611
}

0 commit comments

Comments
 (0)