Skip to content

Utility methods to add and remove backing indices from data streams #77778

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Sep 16, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import org.apache.lucene.index.PointValues;
import org.elasticsearch.cluster.AbstractDiffable;
import org.elasticsearch.cluster.Diff;
import org.elasticsearch.common.Strings;
import org.elasticsearch.core.Nullable;
import org.elasticsearch.common.xcontent.ParseField;
import org.elasticsearch.common.io.stream.StreamInput;
Expand All @@ -35,6 +36,7 @@
import java.util.Map;
import java.util.Objects;
import java.util.function.LongSupplier;
import java.util.stream.Collectors;

public final class DataStream extends AbstractDiffable<DataStream> implements ToXContentObject {

Expand Down Expand Up @@ -178,8 +180,20 @@ public DataStream rollover(Metadata clusterMetadata, String writeIndexUuid) {
*
* @param index the backing index to remove
* @return new {@code DataStream} instance with the remaining backing indices
* @throws IllegalArgumentException if {@code index} is not a backing index or is the current write index of the data stream
*/
public DataStream removeBackingIndex(Index index) {
int backingIndexPosition = indices.indexOf(index);

if (backingIndexPosition == -1) {
throw new IllegalArgumentException(String.format(Locale.ROOT, "index [%s] is not part of data stream [%s]",
index.getName(), name));
}
if (generation == (backingIndexPosition + 1)) {
throw new IllegalArgumentException(String.format(Locale.ROOT, "cannot remove backing index [%s] of data stream [%s] because " +
"it is the write index", index.getName(), name));
}

List<Index> backingIndices = new ArrayList<>(indices);
backingIndices.remove(index);
assert backingIndices.size() == indices.size() - 1;
Expand All @@ -200,7 +214,7 @@ public DataStream replaceBackingIndex(Index existingBackingIndex, Index newBacki
List<Index> backingIndices = new ArrayList<>(indices);
int backingIndexPosition = backingIndices.indexOf(existingBackingIndex);
if (backingIndexPosition == -1) {
throw new IllegalArgumentException(String.format(Locale.ROOT, "index [%s] is not part of data stream [%s] ",
throw new IllegalArgumentException(String.format(Locale.ROOT, "index [%s] is not part of data stream [%s]",
existingBackingIndex.getName(), name));
}
if (generation == (backingIndexPosition + 1)) {
Expand All @@ -211,6 +225,53 @@ public DataStream replaceBackingIndex(Index existingBackingIndex, Index newBacki
return new DataStream(name, timeStampField, backingIndices, generation, metadata, hidden, replicated, system);
}

/**
* Adds the specified index as a backing index and returns a new {@code DataStream} instance with the new combination
* of backing indices.
*
* @param index index to add to the data stream
* @return new {@code DataStream} instance with the added backing index
* @throws IllegalArgumentException if {@code index} is ineligible to be a backing index for the data stream
*/
public DataStream addBackingIndex(Metadata clusterMetadata, Index index) {
// validate that index is not part of another data stream
final var parentDataStream = clusterMetadata.getIndicesLookup().get(index.getName()).getParentDataStream();
if (parentDataStream != null) {
if (parentDataStream.getDataStream().equals(this)) {
return this;
} else {
throw new IllegalArgumentException(
String.format(Locale.ROOT,
"cannot add index [%s] to data stream [%s] because it is already a backing index on data stream [%s]",
index.getName(),
getName(),
parentDataStream.getName()
)
);
}
}

// ensure that no aliases reference index
IndexMetadata im = clusterMetadata.getIndicesLookup().get(index.getName()).getWriteIndex();
if (im.getAliases().size() > 0) {
throw new IllegalArgumentException(
String.format(Locale.ROOT,
"cannot add index [%s] to data stream [%s] until its alias(es) [%s] are removed",
index.getName(),
getName(),
Strings.collectionToCommaDelimitedString(
im.getAliases().stream().map(Map.Entry::getKey).sorted().collect(Collectors.toList())
)
)
);
}

List<Index> backingIndices = new ArrayList<>(indices);
backingIndices.add(0, index);
assert backingIndices.size() == indices.size() + 1;
return new DataStream(name, timeStampField, backingIndices, generation + 1, metadata, hidden, replicated, system);
}

public DataStream promoteDataStream() {
return new DataStream(name, timeStampField, indices, getGeneration(), metadata, hidden, false, system, timeProvider);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
package org.elasticsearch.cluster.metadata;

import org.elasticsearch.Version;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.UUIDs;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.xcontent.XContentParser;
Expand All @@ -16,6 +17,7 @@

import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
Expand Down Expand Up @@ -103,6 +105,243 @@ public void testRemoveBackingIndex() {
}
}

public void testRemoveBackingIndexThatDoesNotExist() {
int numBackingIndices = randomIntBetween(2, 32);
String dataStreamName = randomAlphaOfLength(10).toLowerCase(Locale.ROOT);

List<Index> indices = new ArrayList<>(numBackingIndices);
for (int k = 1; k <= numBackingIndices; k++) {
indices.add(new Index(DataStream.getDefaultBackingIndexName(dataStreamName, k), UUIDs.randomBase64UUID(random())));
}
DataStream original = new DataStream(dataStreamName, createTimestampField("@timestamp"), indices);

final Index indexToRemove = new Index(randomAlphaOfLength(4), UUIDs.randomBase64UUID(random()));

IllegalArgumentException e = expectThrows(
IllegalArgumentException.class,
() -> original.removeBackingIndex(indexToRemove)
);
assertThat(
e.getMessage(),
equalTo(
String.format(
Locale.ROOT,
"index [%s] is not part of data stream [%s]",
indexToRemove.getName(),
dataStreamName)
)
);
}

public void testRemoveBackingWriteIndex() {
int numBackingIndices = randomIntBetween(2, 32);
String dataStreamName = randomAlphaOfLength(10).toLowerCase(Locale.ROOT);

List<Index> indices = new ArrayList<>(numBackingIndices);
for (int k = 1; k <= numBackingIndices; k++) {
indices.add(new Index(DataStream.getDefaultBackingIndexName(dataStreamName, k), UUIDs.randomBase64UUID(random())));
}
DataStream original = new DataStream(dataStreamName, createTimestampField("@timestamp"), indices);

IllegalArgumentException e = expectThrows(
IllegalArgumentException.class,
() -> original.removeBackingIndex(indices.get(numBackingIndices - 1))
);
assertThat(
e.getMessage(),
equalTo(
String.format(
Locale.ROOT,
"cannot remove backing index [%s] of data stream [%s] because it is the write index",
indices.get(numBackingIndices - 1).getName(),
dataStreamName
)
)
);
}

public void testAddBackingIndex() {
int numBackingIndices = randomIntBetween(2, 32);
String dataStreamName = randomAlphaOfLength(10).toLowerCase(Locale.ROOT);
final long epochMillis = System.currentTimeMillis();

List<Index> indices = new ArrayList<>(numBackingIndices);
for (int k = 1; k <= numBackingIndices; k++) {
indices.add(new Index(DataStream.getDefaultBackingIndexName(dataStreamName, k, epochMillis), UUIDs.randomBase64UUID(random())));
}

Metadata.Builder builder = Metadata.builder();
for (int k = 1; k <= numBackingIndices; k++) {
IndexMetadata im = IndexMetadata.builder(indices.get(k - 1).getName())
.settings(settings(Version.CURRENT))
.numberOfShards(1)
.numberOfReplicas(1)
.build();
builder.put(im, false);
}
DataStream original = new DataStream(dataStreamName, createTimestampField("@timestamp"), indices);
builder.put(original);
Index indexToAdd = new Index(randomAlphaOfLength(4), UUIDs.randomBase64UUID(random()));
builder.put(
IndexMetadata
.builder(indexToAdd.getName())
.settings(settings(Version.CURRENT))
.numberOfShards(1)
.numberOfReplicas(1)
.build(),
false
);

DataStream updated = original.addBackingIndex(builder.build(), indexToAdd);
assertThat(updated.getName(), equalTo(original.getName()));
assertThat(updated.getGeneration(), equalTo(original.getGeneration() + 1));
assertThat(updated.getTimeStampField(), equalTo(original.getTimeStampField()));
assertThat(updated.getIndices().size(), equalTo(numBackingIndices + 1));
for (int k = 1; k <= numBackingIndices; k++) {
assertThat(updated.getIndices().get(k), equalTo(original.getIndices().get(k - 1)));
}
assertThat(updated.getIndices().get(0), equalTo(indexToAdd));
}

public void testAddBackingIndexThatIsPartOfAnotherDataStream() {
int numBackingIndices = randomIntBetween(2, 32);
final String dsName1 = randomAlphaOfLength(10).toLowerCase(Locale.ROOT);
final String dsName2 = randomAlphaOfLength(10).toLowerCase(Locale.ROOT);

final long epochMillis = System.currentTimeMillis();

List<Index> indices1 = new ArrayList<>(numBackingIndices);
List<Index> indices2 = new ArrayList<>(numBackingIndices);
for (int k = 1; k <= numBackingIndices; k++) {
indices1.add(new Index(DataStream.getDefaultBackingIndexName(dsName1, k, epochMillis), UUIDs.randomBase64UUID(random())));
indices2.add(new Index(DataStream.getDefaultBackingIndexName(dsName2, k, epochMillis), UUIDs.randomBase64UUID(random())));
}

Metadata.Builder builder = Metadata.builder();
for (int k = 1; k <= numBackingIndices; k++) {
IndexMetadata im = IndexMetadata.builder(indices1.get(k - 1).getName())
.settings(settings(Version.CURRENT))
.numberOfShards(1)
.numberOfReplicas(1)
.build();
builder.put(im, false);
im = IndexMetadata.builder(indices2.get(k - 1).getName())
.settings(settings(Version.CURRENT))
.numberOfShards(1)
.numberOfReplicas(1)
.build();
builder.put(im, false);
}
DataStream ds1 = new DataStream(dsName1, createTimestampField("@timestamp"), indices1);
DataStream ds2 = new DataStream(dsName2, createTimestampField("@timestamp"), indices2);
builder.put(ds1);
builder.put(ds2);

Index indexToAdd = randomFrom(indices2.toArray(Index.EMPTY_ARRAY));

IllegalArgumentException e = expectThrows(IllegalArgumentException.class, () -> ds1.addBackingIndex(builder.build(), indexToAdd));
assertThat(
e.getMessage(),
equalTo(
String.format(
Locale.ROOT,
"cannot add index [%s] to data stream [%s] because it is already a backing index on data stream [%s]",
indexToAdd.getName(),
ds1.getName(),
ds2.getName()
)
)
);
}

public void testAddExistingBackingIndex() {
int numBackingIndices = randomIntBetween(2, 32);
String dataStreamName = randomAlphaOfLength(10).toLowerCase(Locale.ROOT);
final long epochMillis = System.currentTimeMillis();

List<Index> indices = new ArrayList<>(numBackingIndices);
for (int k = 1; k <= numBackingIndices; k++) {
indices.add(new Index(DataStream.getDefaultBackingIndexName(dataStreamName, k, epochMillis), UUIDs.randomBase64UUID(random())));
}

Metadata.Builder builder = Metadata.builder();
for (int k = 1; k <= numBackingIndices; k++) {
IndexMetadata im = IndexMetadata.builder(indices.get(k - 1).getName())
.settings(settings(Version.CURRENT))
.numberOfShards(1)
.numberOfReplicas(1)
.build();
builder.put(im, false);
}
DataStream original = new DataStream(dataStreamName, createTimestampField("@timestamp"), indices);
builder.put(original);
Index indexToAdd = randomFrom(indices.toArray(Index.EMPTY_ARRAY));

DataStream updated = original.addBackingIndex(builder.build(), indexToAdd);
assertThat(updated.getName(), equalTo(original.getName()));
assertThat(updated.getGeneration(), equalTo(original.getGeneration()));
assertThat(updated.getTimeStampField(), equalTo(original.getTimeStampField()));
assertThat(updated.getIndices().size(), equalTo(numBackingIndices));
for (int k = 0; k < numBackingIndices; k++) {
assertThat(updated.getIndices().get(k), equalTo(original.getIndices().get(k)));
}
}

public void testAddBackingIndexWithAliases() {
int numBackingIndices = randomIntBetween(2, 32);
String dataStreamName = randomAlphaOfLength(10).toLowerCase(Locale.ROOT);
final long epochMillis = System.currentTimeMillis();

List<Index> indices = new ArrayList<>(numBackingIndices);
for (int k = 1; k <= numBackingIndices; k++) {
indices.add(new Index(DataStream.getDefaultBackingIndexName(dataStreamName, k, epochMillis), UUIDs.randomBase64UUID(random())));
}

Metadata.Builder builder = Metadata.builder();
for (int k = 1; k <= numBackingIndices; k++) {
IndexMetadata im = IndexMetadata.builder(indices.get(k - 1).getName())
.settings(settings(Version.CURRENT))
.numberOfShards(1)
.numberOfReplicas(1)
.build();
builder.put(im, false);
}
DataStream original = new DataStream(dataStreamName, createTimestampField("@timestamp"), indices);
builder.put(original);

Index indexToAdd = new Index(randomAlphaOfLength(4), UUIDs.randomBase64UUID(random()));
IndexMetadata.Builder b = IndexMetadata
.builder(indexToAdd.getName())
.settings(settings(Version.CURRENT))
.numberOfShards(1)
.numberOfReplicas(1);
final int numAliases = randomIntBetween(1, 3);
final String[] aliasNames = new String[numAliases];
for (int k = 0; k < numAliases; k++) {
aliasNames[k] = randomAlphaOfLength(6);
b.putAlias(AliasMetadata.builder(aliasNames[k]));
}
builder.put(b.build(), false);
Arrays.sort(aliasNames);

IllegalArgumentException e = expectThrows(
IllegalArgumentException.class,
() -> original.addBackingIndex(builder.build(), indexToAdd)
);
assertThat(
e.getMessage(),
equalTo(
String.format(
Locale.ROOT,
"cannot add index [%s] to data stream [%s] until its alias(es) [%s] are removed",
indexToAdd.getName(),
original.getName(),
Strings.arrayToCommaDelimitedString(aliasNames)
)
)
);
}

public void testDefaultBackingIndexName() {
// this test does little more than flag that changing the default naming convention for backing indices
// will also require changing a lot of hard-coded values in REST tests and docs
Expand Down