diff --git a/server/src/main/java/org/elasticsearch/cluster/metadata/DataStream.java b/server/src/main/java/org/elasticsearch/cluster/metadata/DataStream.java index 70557878772e6..b0d81bad9d984 100644 --- a/server/src/main/java/org/elasticsearch/cluster/metadata/DataStream.java +++ b/server/src/main/java/org/elasticsearch/cluster/metadata/DataStream.java @@ -12,6 +12,7 @@ import org.apache.lucene.index.PointValues; import org.elasticsearch.cluster.AbstractDiffable; import org.elasticsearch.cluster.Diff; +import org.elasticsearch.common.Strings; import org.elasticsearch.core.Nullable; import org.elasticsearch.common.xcontent.ParseField; import org.elasticsearch.common.io.stream.StreamInput; @@ -35,6 +36,7 @@ import java.util.Map; import java.util.Objects; import java.util.function.LongSupplier; +import java.util.stream.Collectors; public final class DataStream extends AbstractDiffable implements ToXContentObject { @@ -178,8 +180,20 @@ public DataStream rollover(Metadata clusterMetadata, String writeIndexUuid) { * * @param index the backing index to remove * @return new {@code DataStream} instance with the remaining backing indices + * @throws IllegalArgumentException if {@code index} is not a backing index or is the current write index of the data stream */ public DataStream removeBackingIndex(Index index) { + int backingIndexPosition = indices.indexOf(index); + + if (backingIndexPosition == -1) { + throw new IllegalArgumentException(String.format(Locale.ROOT, "index [%s] is not part of data stream [%s]", + index.getName(), name)); + } + if (generation == (backingIndexPosition + 1)) { + throw new IllegalArgumentException(String.format(Locale.ROOT, "cannot remove backing index [%s] of data stream [%s] because " + + "it is the write index", index.getName(), name)); + } + List backingIndices = new ArrayList<>(indices); backingIndices.remove(index); assert backingIndices.size() == indices.size() - 1; @@ -200,7 +214,7 @@ public DataStream replaceBackingIndex(Index existingBackingIndex, Index newBacki List backingIndices = new ArrayList<>(indices); int backingIndexPosition = backingIndices.indexOf(existingBackingIndex); if (backingIndexPosition == -1) { - throw new IllegalArgumentException(String.format(Locale.ROOT, "index [%s] is not part of data stream [%s] ", + throw new IllegalArgumentException(String.format(Locale.ROOT, "index [%s] is not part of data stream [%s]", existingBackingIndex.getName(), name)); } if (generation == (backingIndexPosition + 1)) { @@ -211,6 +225,53 @@ public DataStream replaceBackingIndex(Index existingBackingIndex, Index newBacki return new DataStream(name, timeStampField, backingIndices, generation, metadata, hidden, replicated, system); } + /** + * Adds the specified index as a backing index and returns a new {@code DataStream} instance with the new combination + * of backing indices. + * + * @param index index to add to the data stream + * @return new {@code DataStream} instance with the added backing index + * @throws IllegalArgumentException if {@code index} is ineligible to be a backing index for the data stream + */ + public DataStream addBackingIndex(Metadata clusterMetadata, Index index) { + // validate that index is not part of another data stream + final var parentDataStream = clusterMetadata.getIndicesLookup().get(index.getName()).getParentDataStream(); + if (parentDataStream != null) { + if (parentDataStream.getDataStream().equals(this)) { + return this; + } else { + throw new IllegalArgumentException( + String.format(Locale.ROOT, + "cannot add index [%s] to data stream [%s] because it is already a backing index on data stream [%s]", + index.getName(), + getName(), + parentDataStream.getName() + ) + ); + } + } + + // ensure that no aliases reference index + IndexMetadata im = clusterMetadata.getIndicesLookup().get(index.getName()).getWriteIndex(); + if (im.getAliases().size() > 0) { + throw new IllegalArgumentException( + String.format(Locale.ROOT, + "cannot add index [%s] to data stream [%s] until its alias(es) [%s] are removed", + index.getName(), + getName(), + Strings.collectionToCommaDelimitedString( + im.getAliases().stream().map(Map.Entry::getKey).sorted().collect(Collectors.toList()) + ) + ) + ); + } + + List backingIndices = new ArrayList<>(indices); + backingIndices.add(0, index); + assert backingIndices.size() == indices.size() + 1; + return new DataStream(name, timeStampField, backingIndices, generation + 1, metadata, hidden, replicated, system); + } + public DataStream promoteDataStream() { return new DataStream(name, timeStampField, indices, getGeneration(), metadata, hidden, false, system, timeProvider); } diff --git a/server/src/test/java/org/elasticsearch/cluster/metadata/DataStreamTests.java b/server/src/test/java/org/elasticsearch/cluster/metadata/DataStreamTests.java index 970f2f0f78a80..eff33cf8779a7 100644 --- a/server/src/test/java/org/elasticsearch/cluster/metadata/DataStreamTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/metadata/DataStreamTests.java @@ -8,6 +8,7 @@ package org.elasticsearch.cluster.metadata; import org.elasticsearch.Version; +import org.elasticsearch.common.Strings; import org.elasticsearch.common.UUIDs; import org.elasticsearch.common.io.stream.Writeable; import org.elasticsearch.common.xcontent.XContentParser; @@ -16,6 +17,7 @@ import java.io.IOException; import java.util.ArrayList; +import java.util.Arrays; import java.util.HashMap; import java.util.HashSet; import java.util.List; @@ -103,6 +105,243 @@ public void testRemoveBackingIndex() { } } + public void testRemoveBackingIndexThatDoesNotExist() { + int numBackingIndices = randomIntBetween(2, 32); + String dataStreamName = randomAlphaOfLength(10).toLowerCase(Locale.ROOT); + + List indices = new ArrayList<>(numBackingIndices); + for (int k = 1; k <= numBackingIndices; k++) { + indices.add(new Index(DataStream.getDefaultBackingIndexName(dataStreamName, k), UUIDs.randomBase64UUID(random()))); + } + DataStream original = new DataStream(dataStreamName, createTimestampField("@timestamp"), indices); + + final Index indexToRemove = new Index(randomAlphaOfLength(4), UUIDs.randomBase64UUID(random())); + + IllegalArgumentException e = expectThrows( + IllegalArgumentException.class, + () -> original.removeBackingIndex(indexToRemove) + ); + assertThat( + e.getMessage(), + equalTo( + String.format( + Locale.ROOT, + "index [%s] is not part of data stream [%s]", + indexToRemove.getName(), + dataStreamName) + ) + ); + } + + public void testRemoveBackingWriteIndex() { + int numBackingIndices = randomIntBetween(2, 32); + String dataStreamName = randomAlphaOfLength(10).toLowerCase(Locale.ROOT); + + List indices = new ArrayList<>(numBackingIndices); + for (int k = 1; k <= numBackingIndices; k++) { + indices.add(new Index(DataStream.getDefaultBackingIndexName(dataStreamName, k), UUIDs.randomBase64UUID(random()))); + } + DataStream original = new DataStream(dataStreamName, createTimestampField("@timestamp"), indices); + + IllegalArgumentException e = expectThrows( + IllegalArgumentException.class, + () -> original.removeBackingIndex(indices.get(numBackingIndices - 1)) + ); + assertThat( + e.getMessage(), + equalTo( + String.format( + Locale.ROOT, + "cannot remove backing index [%s] of data stream [%s] because it is the write index", + indices.get(numBackingIndices - 1).getName(), + dataStreamName + ) + ) + ); + } + + public void testAddBackingIndex() { + int numBackingIndices = randomIntBetween(2, 32); + String dataStreamName = randomAlphaOfLength(10).toLowerCase(Locale.ROOT); + final long epochMillis = System.currentTimeMillis(); + + List indices = new ArrayList<>(numBackingIndices); + for (int k = 1; k <= numBackingIndices; k++) { + indices.add(new Index(DataStream.getDefaultBackingIndexName(dataStreamName, k, epochMillis), UUIDs.randomBase64UUID(random()))); + } + + Metadata.Builder builder = Metadata.builder(); + for (int k = 1; k <= numBackingIndices; k++) { + IndexMetadata im = IndexMetadata.builder(indices.get(k - 1).getName()) + .settings(settings(Version.CURRENT)) + .numberOfShards(1) + .numberOfReplicas(1) + .build(); + builder.put(im, false); + } + DataStream original = new DataStream(dataStreamName, createTimestampField("@timestamp"), indices); + builder.put(original); + Index indexToAdd = new Index(randomAlphaOfLength(4), UUIDs.randomBase64UUID(random())); + builder.put( + IndexMetadata + .builder(indexToAdd.getName()) + .settings(settings(Version.CURRENT)) + .numberOfShards(1) + .numberOfReplicas(1) + .build(), + false + ); + + DataStream updated = original.addBackingIndex(builder.build(), indexToAdd); + assertThat(updated.getName(), equalTo(original.getName())); + assertThat(updated.getGeneration(), equalTo(original.getGeneration() + 1)); + assertThat(updated.getTimeStampField(), equalTo(original.getTimeStampField())); + assertThat(updated.getIndices().size(), equalTo(numBackingIndices + 1)); + for (int k = 1; k <= numBackingIndices; k++) { + assertThat(updated.getIndices().get(k), equalTo(original.getIndices().get(k - 1))); + } + assertThat(updated.getIndices().get(0), equalTo(indexToAdd)); + } + + public void testAddBackingIndexThatIsPartOfAnotherDataStream() { + int numBackingIndices = randomIntBetween(2, 32); + final String dsName1 = randomAlphaOfLength(10).toLowerCase(Locale.ROOT); + final String dsName2 = randomAlphaOfLength(10).toLowerCase(Locale.ROOT); + + final long epochMillis = System.currentTimeMillis(); + + List indices1 = new ArrayList<>(numBackingIndices); + List indices2 = new ArrayList<>(numBackingIndices); + for (int k = 1; k <= numBackingIndices; k++) { + indices1.add(new Index(DataStream.getDefaultBackingIndexName(dsName1, k, epochMillis), UUIDs.randomBase64UUID(random()))); + indices2.add(new Index(DataStream.getDefaultBackingIndexName(dsName2, k, epochMillis), UUIDs.randomBase64UUID(random()))); + } + + Metadata.Builder builder = Metadata.builder(); + for (int k = 1; k <= numBackingIndices; k++) { + IndexMetadata im = IndexMetadata.builder(indices1.get(k - 1).getName()) + .settings(settings(Version.CURRENT)) + .numberOfShards(1) + .numberOfReplicas(1) + .build(); + builder.put(im, false); + im = IndexMetadata.builder(indices2.get(k - 1).getName()) + .settings(settings(Version.CURRENT)) + .numberOfShards(1) + .numberOfReplicas(1) + .build(); + builder.put(im, false); + } + DataStream ds1 = new DataStream(dsName1, createTimestampField("@timestamp"), indices1); + DataStream ds2 = new DataStream(dsName2, createTimestampField("@timestamp"), indices2); + builder.put(ds1); + builder.put(ds2); + + Index indexToAdd = randomFrom(indices2.toArray(Index.EMPTY_ARRAY)); + + IllegalArgumentException e = expectThrows(IllegalArgumentException.class, () -> ds1.addBackingIndex(builder.build(), indexToAdd)); + assertThat( + e.getMessage(), + equalTo( + String.format( + Locale.ROOT, + "cannot add index [%s] to data stream [%s] because it is already a backing index on data stream [%s]", + indexToAdd.getName(), + ds1.getName(), + ds2.getName() + ) + ) + ); + } + + public void testAddExistingBackingIndex() { + int numBackingIndices = randomIntBetween(2, 32); + String dataStreamName = randomAlphaOfLength(10).toLowerCase(Locale.ROOT); + final long epochMillis = System.currentTimeMillis(); + + List indices = new ArrayList<>(numBackingIndices); + for (int k = 1; k <= numBackingIndices; k++) { + indices.add(new Index(DataStream.getDefaultBackingIndexName(dataStreamName, k, epochMillis), UUIDs.randomBase64UUID(random()))); + } + + Metadata.Builder builder = Metadata.builder(); + for (int k = 1; k <= numBackingIndices; k++) { + IndexMetadata im = IndexMetadata.builder(indices.get(k - 1).getName()) + .settings(settings(Version.CURRENT)) + .numberOfShards(1) + .numberOfReplicas(1) + .build(); + builder.put(im, false); + } + DataStream original = new DataStream(dataStreamName, createTimestampField("@timestamp"), indices); + builder.put(original); + Index indexToAdd = randomFrom(indices.toArray(Index.EMPTY_ARRAY)); + + DataStream updated = original.addBackingIndex(builder.build(), indexToAdd); + assertThat(updated.getName(), equalTo(original.getName())); + assertThat(updated.getGeneration(), equalTo(original.getGeneration())); + assertThat(updated.getTimeStampField(), equalTo(original.getTimeStampField())); + assertThat(updated.getIndices().size(), equalTo(numBackingIndices)); + for (int k = 0; k < numBackingIndices; k++) { + assertThat(updated.getIndices().get(k), equalTo(original.getIndices().get(k))); + } + } + + public void testAddBackingIndexWithAliases() { + int numBackingIndices = randomIntBetween(2, 32); + String dataStreamName = randomAlphaOfLength(10).toLowerCase(Locale.ROOT); + final long epochMillis = System.currentTimeMillis(); + + List indices = new ArrayList<>(numBackingIndices); + for (int k = 1; k <= numBackingIndices; k++) { + indices.add(new Index(DataStream.getDefaultBackingIndexName(dataStreamName, k, epochMillis), UUIDs.randomBase64UUID(random()))); + } + + Metadata.Builder builder = Metadata.builder(); + for (int k = 1; k <= numBackingIndices; k++) { + IndexMetadata im = IndexMetadata.builder(indices.get(k - 1).getName()) + .settings(settings(Version.CURRENT)) + .numberOfShards(1) + .numberOfReplicas(1) + .build(); + builder.put(im, false); + } + DataStream original = new DataStream(dataStreamName, createTimestampField("@timestamp"), indices); + builder.put(original); + + Index indexToAdd = new Index(randomAlphaOfLength(4), UUIDs.randomBase64UUID(random())); + IndexMetadata.Builder b = IndexMetadata + .builder(indexToAdd.getName()) + .settings(settings(Version.CURRENT)) + .numberOfShards(1) + .numberOfReplicas(1); + final int numAliases = randomIntBetween(1, 3); + final String[] aliasNames = new String[numAliases]; + for (int k = 0; k < numAliases; k++) { + aliasNames[k] = randomAlphaOfLength(6); + b.putAlias(AliasMetadata.builder(aliasNames[k])); + } + builder.put(b.build(), false); + Arrays.sort(aliasNames); + + IllegalArgumentException e = expectThrows( + IllegalArgumentException.class, + () -> original.addBackingIndex(builder.build(), indexToAdd) + ); + assertThat( + e.getMessage(), + equalTo( + String.format( + Locale.ROOT, + "cannot add index [%s] to data stream [%s] until its alias(es) [%s] are removed", + indexToAdd.getName(), + original.getName(), + Strings.arrayToCommaDelimitedString(aliasNames) + ) + ) + ); + } + public void testDefaultBackingIndexName() { // this test does little more than flag that changing the default naming convention for backing indices // will also require changing a lot of hard-coded values in REST tests and docs